Java线程<第四篇>:线程间通信
2024-07-03 00:00:03  阅读数 475

线程间通信是Java线程必须掌握的课程之一。
线程间的通信的前提是,必须要保证线程还活着,可以使用阻塞方法,将线程暂时阻塞起来,当满足一定条件时,通知阻塞的线程继续执行。
常用的阻塞方法有:sleep、wait、CountDownLatch,sleep只能实现线程暂时停止执行,并不能做到通知的目的,CountDownLatch 是一个非常不错的方法,在《Java线程》系列的后面的文章中会重点说明,并不是本章的重点。
本章的重点是对 wait 的使用和介绍,与 wait 对应的还有 notifynotifyAllnotifynotifyAll 具有通知的作用。

(1)生产者和消费者

在多线程间通信的过程就是生产者和消费者的过程,在这里需要掌握 生产者/消费者设计模式 的架构,在掌握这种模式的基础上,我们再来思考线程间通信遇到的问题。

生产者/消费者设计模式 的架构模型可以查询其它资料,本系列不做重点说明。

(2)简单线程通信

将生产者和消费者放在同一个线程处理,即同步处理,就以售票为例:

import java.util.concurrent.TimeUnit;

public class Test {

    private static TicketInfo ticketInfo;

    /**
     * 购票
     * @param info 票的信息
     */
    private static void buyTicket(TicketInfo info) throws InterruptedException {
        ticketInfo = info;
        TimeUnit.SECONDS.sleep(3); // 假设购票的时间为3秒
    }

    /**
     * 售票
     */
    private static TicketInfo sellTicket() throws InterruptedException {
        if (ticketInfo == null) {
            return null;
        }
        ticketInfo.setAcceptance(true); // 已受理
        TimeUnit.SECONDS.sleep(3); // 假设售票的时间为3秒
        return ticketInfo;
    }

    public static void main(String[] args) throws InterruptedException {

        TicketInfo ticketInfo = new TicketInfo();
        ticketInfo.setTicketID("123");
        ticketInfo.setAddress("123");
        ticketInfo.setAge(10);
        ticketInfo.setId("001");
        ticketInfo.setName("张三");
        ticketInfo.setAcceptance(false); // 是否受理

        buyTicket(ticketInfo); // 购票

        TicketInfo ticket = sellTicket(); // 售票

        System.out.println("是否受理:" + ticket.isAcceptance());

    }

}

以上代码可以实现购票和售票的业务,购票时间为3秒,售票时间为3秒,在不考虑其它开销的情况下,购票和售票加起来需要6秒钟时间。假如,同一时间,有多人买票呢? 在多人买票的情况下,必须要等到前面一个人完成购票和售票的全部流程才可以。

为了解决这个问题,我们第一步需要做的就是将 购票 和 售票分离,分别用不同的线程来管理,代码如下:

public class Test {

    private static TicketInfo ticketInfo;

    /**
     * 购票
     * @param info 票的信息
     */
    private static void buyTicket(TicketInfo info) throws InterruptedException {
        ticketInfo = info;
        synchronized (ticketInfo) {
            TimeUnit.SECONDS.sleep(3); // 假设购票的时间为3秒
            ticketInfo.wait(); // 等待
        }
        System.out.println(ticketInfo.getName() + "已经卖到票了");
    }

    /**
     * 售票
     */
    private static TicketInfo sellTicket() throws InterruptedException {
        if (ticketInfo == null) {
            return null;
        }
        synchronized (ticketInfo) {
            ticketInfo.setAcceptance(true); // 已受理
            TimeUnit.SECONDS.sleep(3); // 假设售票的时间为3秒
            ticketInfo.notify(); // 通知
        }
        return ticketInfo;
    }

    public static void main(String[] args) {

        TicketInfo ticketInfo = new TicketInfo();
        ticketInfo.setTicketID("123");
        ticketInfo.setAddress("123");
        ticketInfo.setAge(10);
        ticketInfo.setId("001");
        ticketInfo.setName("张三");
        ticketInfo.setAcceptance(false); // 是否受理
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    buyTicket(ticketInfo); // 购票
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TicketInfo ticket = sellTicket(); // 售票
                    System.out.println("是否受理:" + ticket.isAcceptance());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

}

请注意,购票和售票的共享对象是TicketInfo,需要使用:

ticketInfo.wait(); // 等待
ticketInfo.notify(); // 通知

如果调用了TicketInfo的wait、notify方法,必须对该对象添加同步锁(synchronized (ticketInfo)),否则直接异常。wait 和 notify 还有一些主要事项,我已经全部梳理出来了:

(1)wait 方法是可中断方法,也就是说,当前线程一旦调用了 wait 方法进入阻塞状态,
    其他线程是可以使用 interrput 方法将其打断的,打断之后,可以捕获到中断异常 InterruptedException;
(2)线程执行了某个对象的 wait 方法以后,会加入与之对应 wait set 中,每一个对象的 monitor 都有一个与之关联的 wait set;
(3)当前线程进入 wait set 之后,notify 方法可以将其唤醒,也就是从 wait set 中弹出,同时中断 wait 中的线程也会将其唤醒;
(4)就是上面提到的,必须在同步方法中使用 wait 和 notify 方法,因为执行 wait 和 notify 的前提条件是必须持有同步方法的 monitor 的所有权, 否则抛出 IllegalMonitorStateException 异常。

(3)多线程通信

如果有多人买票,就有可能有多张票没有来得及受理,那么,这些票需要用一个缓冲列表来管理,这里使用 LinkedBlockingDeque 来管理它们,整体后的代码如下:

import java.util.concurrent.LinkedBlockingDeque;

public class Test {

    private static LinkedBlockingDeque<TicketInfo> deque = new LinkedBlockingDeque<>(10);

    /**
     * 购票
     * @param info 票的信息
     */
    private static void buyTicket(TicketInfo info) throws InterruptedException {
        synchronized (deque) {
            if (deque.size() >= 10) {
                deque.wait();
            } else {
                deque.add(info);
                System.out.println(info.getName() + "已经卖到票了");
                deque.notifyAll();
            }
        }
    }

    /**
     * 售票
     */
    private static void sellTicket() throws InterruptedException {
        while (true) {
            synchronized (deque) {
                if (deque.isEmpty()) {
                    deque.wait();
                }
                TicketInfo ticketInfo = deque.removeFirst();
                if (ticketInfo != null) {
                    System.out.println(ticketInfo.getName() + "的买票流程已经得到受理");
                }
                deque.notifyAll();
            }
        }
    }

    public static void main(String[] args) {

        for (int i = 0; i < 10; i++) {
            int finalI = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    TicketInfo ticketInfo = new TicketInfo();
                    ticketInfo.setTicketID("123");
                    ticketInfo.setAddress("123");
                    ticketInfo.setAge(10);
                    ticketInfo.setId("001");
                    ticketInfo.setName(String.valueOf(finalI + 1));
                    try {
                        buyTicket(ticketInfo); // 购票
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    sellTicket(); // 售票
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

TicketInfo 由 LinkedBlockingDeque 统一来管理,买票其实就是将 TicketInfo 对象放到 LinkedBlockingDeque 中,售票其实就是将 TicketInfo 从LinkedBlockingDeque 中删除。

由于可能存在多个线程处于等待状态,所以唤醒的时候必须使用 notifyAll 来唤醒。

以上代码的打印结果为:

8已经卖到票了
8的买票流程已经得到受理
4已经卖到票了
5已经卖到票了
9已经卖到票了
2已经卖到票了
7已经卖到票了
1已经卖到票了
10已经卖到票了
3已经卖到票了
6已经卖到票了
4的买票流程已经得到受理
5的买票流程已经得到受理
9的买票流程已经得到受理
2的买票流程已经得到受理
7的买票流程已经得到受理
1的买票流程已经得到受理
10的买票流程已经得到受理
3的买票流程已经得到受理
6的买票流程已经得到受理

以上代码用for循环不停的 new Thread,这是不可取的,应该改成线程池,线程池的讲解会在本系列后面的篇章会有针对性地说明。

下面说明一下线程休息室的基本常识:

执行 wait 方法之后被加入到与 monitor 相关联的 wait set 中,wait set 就线程休息室,当另一个线程调用该 monitor 的 notify 方法之后,
其中一个线程会从 wait set 中弹出;
如果执行 notifyAll 则不需要考虑是哪个线程被弹出,因为 wait set 中的所有 wait 线程都会被弹出。

[本章完...]