线程间通信是Java线程必须掌握的课程之一。
线程间的通信的前提是,必须要保证线程还活着,可以使用阻塞
方法,将线程暂时阻塞起来,当满足一定条件时,通知
被阻塞
的线程继续执行。
常用的阻塞方法有:sleep、wait、CountDownLatch,sleep只能实现线程暂时停止执行,并不能做到通知的目的,CountDownLatch 是一个非常不错的方法,在《Java线程》系列的后面的文章中会重点说明,并不是本章的重点。
本章的重点是对 wait 的使用和介绍,与 wait 对应的还有notify
和notifyAll
,notify
和notifyAll
具有通知的作用。
(1)生产者和消费者
在多线程间通信的过程就是生产者和消费者的过程,在这里需要掌握 生产者/消费者设计模式
的架构,在掌握这种模式的基础上,我们再来思考线程间通信遇到的问题。
生产者/消费者设计模式
的架构模型可以查询其它资料,本系列不做重点说明。
(2)简单线程通信
将生产者和消费者放在同一个线程处理,即同步处理,就以售票为例:
import java.util.concurrent.TimeUnit;
public class Test {
private static TicketInfo ticketInfo;
/**
* 购票
* @param info 票的信息
*/
private static void buyTicket(TicketInfo info) throws InterruptedException {
ticketInfo = info;
TimeUnit.SECONDS.sleep(3); // 假设购票的时间为3秒
}
/**
* 售票
*/
private static TicketInfo sellTicket() throws InterruptedException {
if (ticketInfo == null) {
return null;
}
ticketInfo.setAcceptance(true); // 已受理
TimeUnit.SECONDS.sleep(3); // 假设售票的时间为3秒
return ticketInfo;
}
public static void main(String[] args) throws InterruptedException {
TicketInfo ticketInfo = new TicketInfo();
ticketInfo.setTicketID("123");
ticketInfo.setAddress("123");
ticketInfo.setAge(10);
ticketInfo.setId("001");
ticketInfo.setName("张三");
ticketInfo.setAcceptance(false); // 是否受理
buyTicket(ticketInfo); // 购票
TicketInfo ticket = sellTicket(); // 售票
System.out.println("是否受理:" + ticket.isAcceptance());
}
}
以上代码可以实现购票和售票的业务,购票时间为3秒,售票时间为3秒,在不考虑其它开销的情况下,购票和售票加起来需要6秒钟时间。假如,同一时间,有多人买票呢? 在多人买票的情况下,必须要等到前面一个人完成购票和售票的全部流程才可以。
为了解决这个问题,我们第一步需要做的就是将 购票 和 售票分离,分别用不同的线程来管理,代码如下:
public class Test {
private static TicketInfo ticketInfo;
/**
* 购票
* @param info 票的信息
*/
private static void buyTicket(TicketInfo info) throws InterruptedException {
ticketInfo = info;
synchronized (ticketInfo) {
TimeUnit.SECONDS.sleep(3); // 假设购票的时间为3秒
ticketInfo.wait(); // 等待
}
System.out.println(ticketInfo.getName() + "已经卖到票了");
}
/**
* 售票
*/
private static TicketInfo sellTicket() throws InterruptedException {
if (ticketInfo == null) {
return null;
}
synchronized (ticketInfo) {
ticketInfo.setAcceptance(true); // 已受理
TimeUnit.SECONDS.sleep(3); // 假设售票的时间为3秒
ticketInfo.notify(); // 通知
}
return ticketInfo;
}
public static void main(String[] args) {
TicketInfo ticketInfo = new TicketInfo();
ticketInfo.setTicketID("123");
ticketInfo.setAddress("123");
ticketInfo.setAge(10);
ticketInfo.setId("001");
ticketInfo.setName("张三");
ticketInfo.setAcceptance(false); // 是否受理
new Thread(new Runnable() {
@Override
public void run() {
try {
buyTicket(ticketInfo); // 购票
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
TicketInfo ticket = sellTicket(); // 售票
System.out.println("是否受理:" + ticket.isAcceptance());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
请注意,购票和售票的共享对象是TicketInfo,需要使用:
ticketInfo.wait(); // 等待
ticketInfo.notify(); // 通知
如果调用了TicketInfo的wait、notify方法,必须对该对象添加同步锁(synchronized (ticketInfo)),否则直接异常。wait 和 notify 还有一些主要事项,我已经全部梳理出来了:
(1)wait 方法是可中断方法,也就是说,当前线程一旦调用了 wait 方法进入阻塞状态,
其他线程是可以使用 interrput 方法将其打断的,打断之后,可以捕获到中断异常 InterruptedException;
(2)线程执行了某个对象的 wait 方法以后,会加入与之对应 wait set 中,每一个对象的 monitor 都有一个与之关联的 wait set;
(3)当前线程进入 wait set 之后,notify 方法可以将其唤醒,也就是从 wait set 中弹出,同时中断 wait 中的线程也会将其唤醒;
(4)就是上面提到的,必须在同步方法中使用 wait 和 notify 方法,因为执行 wait 和 notify 的前提条件是必须持有同步方法的 monitor 的所有权, 否则抛出 IllegalMonitorStateException 异常。
(3)多线程通信
如果有多人买票,就有可能有多张票没有来得及受理,那么,这些票需要用一个缓冲列表来管理,这里使用 LinkedBlockingDeque 来管理它们,整体后的代码如下:
import java.util.concurrent.LinkedBlockingDeque;
public class Test {
private static LinkedBlockingDeque<TicketInfo> deque = new LinkedBlockingDeque<>(10);
/**
* 购票
* @param info 票的信息
*/
private static void buyTicket(TicketInfo info) throws InterruptedException {
synchronized (deque) {
if (deque.size() >= 10) {
deque.wait();
} else {
deque.add(info);
System.out.println(info.getName() + "已经卖到票了");
deque.notifyAll();
}
}
}
/**
* 售票
*/
private static void sellTicket() throws InterruptedException {
while (true) {
synchronized (deque) {
if (deque.isEmpty()) {
deque.wait();
}
TicketInfo ticketInfo = deque.removeFirst();
if (ticketInfo != null) {
System.out.println(ticketInfo.getName() + "的买票流程已经得到受理");
}
deque.notifyAll();
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(new Runnable() {
@Override
public void run() {
TicketInfo ticketInfo = new TicketInfo();
ticketInfo.setTicketID("123");
ticketInfo.setAddress("123");
ticketInfo.setAge(10);
ticketInfo.setId("001");
ticketInfo.setName(String.valueOf(finalI + 1));
try {
buyTicket(ticketInfo); // 购票
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
new Thread(new Runnable() {
@Override
public void run() {
try {
sellTicket(); // 售票
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
TicketInfo 由 LinkedBlockingDeque 统一来管理,买票其实就是将 TicketInfo 对象放到 LinkedBlockingDeque 中,售票其实就是将 TicketInfo 从LinkedBlockingDeque 中删除。
由于可能存在多个线程处于等待状态,所以唤醒的时候必须使用 notifyAll 来唤醒。
以上代码的打印结果为:
8已经卖到票了
8的买票流程已经得到受理
4已经卖到票了
5已经卖到票了
9已经卖到票了
2已经卖到票了
7已经卖到票了
1已经卖到票了
10已经卖到票了
3已经卖到票了
6已经卖到票了
4的买票流程已经得到受理
5的买票流程已经得到受理
9的买票流程已经得到受理
2的买票流程已经得到受理
7的买票流程已经得到受理
1的买票流程已经得到受理
10的买票流程已经得到受理
3的买票流程已经得到受理
6的买票流程已经得到受理
以上代码用for循环不停的 new Thread,这是不可取的,应该改成线程池,线程池的讲解会在本系列后面的篇章会有针对性地说明。
下面说明一下线程休息室的基本常识:
执行 wait 方法之后被加入到与 monitor 相关联的 wait set 中,wait set 就线程休息室,当另一个线程调用该 monitor 的 notify 方法之后,
其中一个线程会从 wait set 中弹出;
如果执行 notifyAll 则不需要考虑是哪个线程被弹出,因为 wait set 中的所有 wait 线程都会被弹出。
[本章完...]