Java并发编程中Lock机制如何实现线程同步?

2026-05-06 06:341阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2009个文字,预计阅读时间需要9分钟。

Java并发编程中Lock机制如何实现线程同步?

一、协调生产/消费需求本文主要介绍Lock与Condition的使用方法,旨在更好地理解Lock锁与Condition信号量的使用。我们将动手实现一个ArrayBlockingQueue。

二、Lock与Condition的使用Lock与Condition是Java并发编程中的重要工具,用于实现线程间的同步与通信。以下是一些基本的使用方法:

1. Lock接口的使用javaLock lock=new ReentrantLock();lock.lock();try { // 执行需要同步的代码} finally { lock.unlock();}

2. Condition的使用javaLock lock=new ReentrantLock();Condition condition=lock.newCondition();

lock.lock();try { // 等待某个条件 condition.await(); // 执行条件满足后的代码} finally { lock.unlock();}

// 发送信号lock.lock();try { // 条件满足,唤醒一个等待线程 condition.signal(); // 或者唤醒所有等待线程 condition.signalAll();} finally { lock.unlock();}

三、ArrayBlockingQueue的实现为了更好地理解Lock与Condition,我们将实现一个基于Lock与Condition的ArrayBlockingQueue。

javaclass ArrayBlockingQueue { private final T[] items; private int takeIndex; private int putIndex; private final int capacity; private final Lock lock; private final Condition notEmpty; private final Condition notFull;

public ArrayBlockingQueue(int capacity) { this.capacity=capacity; this.items=(T[]) new Object[capacity]; this.lock=new ReentrantLock(); this.notEmpty=lock.newCondition(); this.notFull=lock.newCondition(); }

public void put(T x) throws InterruptedException { lock.lock(); try { while (putIndex==takeIndex) { notFull.await(); } items[putIndex]=x; putIndex=(putIndex + 1) % capacity; notEmpty.signal(); } finally { lock.unlock(); } }

public T take() throws InterruptedException { lock.lock(); try { while (takeIndex==putIndex) { notEmpty.await(); } T x=items[takeIndex]; items[takeIndex]=null; takeIndex=(takeIndex + 1) % capacity; notFull.signal(); return x; } finally { lock.unlock(); } }}

通过以上实现,我们成功地使用Lock与Condition实现了ArrayBlockingQueue。

一、协调生产/消费的需求

本文内容主要想向大家介绍一下Lock结合Condition的使用方法,为了更好的理解Lock锁与Condition锁信号,我们来手写一个ArrayBlockingQueue。 JDK实际上已经有这个类,基于Lock锁与Condition锁信号实现的,当然JDK实现代码很复杂包含了更严谨的逻辑校验,以及从性能优化的角度做了更多的工作。本文中我们只是来简单实现一下其核心逻辑:

  • ArrayBlockingQueue初始化构造时指定容量上限最大值
  • 提供put方法,当达到Queue队列容量上限最大值,阻塞生产数据的线程。
  • put方法生产数据之后,队列肯定是不为空,通知消费者线程进行消费。
  • 提供take方法,当Queue队列容量为0时候,阻塞消费数据的线程。
  • take方法执行之后,队列肯定不是满的,通知生产者线程进行生产。
  • 一条数据只能被take一次,take之后数据从queue中删除

相信实现完成上面的逻辑之后,java并发编程之Lock锁与Condition锁信号,你肯定是掌握了!其实这个逻辑基本上就是kafka生产者客户端缓冲队列,批量进行数据发送的实现逻辑。区别是take方法一次取出缓冲区所有数据,本文take方法一次取出一条数据。

二、构造方法

构造队列的方法很简单,使用一个List作为数据存储队列,并指定其容量。到此我们还没有实现容量判断,以及阻塞线程的功能。

//类成员变量-存储数据的队列 private List<Object> queue; //类成员变量-存储队列的容量上限 private int queueSize; public MyBlockingQueue(int queueSize) { this.queueSize = queueSize; queue = new ArrayList<>(queueSize);//存储消息的集合 } 三、Lock& Condition逻辑设计

首先我们要有一把锁,保证数据put与take操作的同步性,即:一条数据只能被take一次,take之后数据从queue中删除;以及创建Condition逻辑都需要Lock锁。学过java基础并发编程的同学,可以把Lock锁理解为Synchronized 同步代码块功能是一样的。我写过一个专栏《java并发编程》中介绍了二者的区别,欢迎关注。

private Lock lock = new ReentrantLock();//锁

Condition逻辑大家可以理解为传统JDK多线程编程中的wait与notify,但是Condition的语义更容易被理解。如下文代码所示:

private Condition notFull = lock.newCondition(); //队列不为满 notFull.signal(); //通知生产者队列不为满,可以继续生产数据(通常在消费者拿走数据之后,调用) notFull.await(); //队列已满,阻塞生产线程(await对condition逻辑取反)

private Condition notEmpty = lock.newCondition(); //队列不为空 notEmpty.signal(); //通知消费者线程队列不为空,可以继续消费数据(通常在生产者生产数据之后,调用) notEmpty.await(); //队列已经空了,阻塞消费线程(await对condition逻辑取反)

大家在使用Lock& Condition进行线程同步协调的时候,一定像我一样先把condition的逻辑语义设计好

  • 将当xxxx时候的表达,设计为condition。
  • 当情况满足condition的时候发出信号signal()通知其他线程;
  • 当情况与condtion正好相反的的时候,使用await阻塞当前线程。
四、put放入数据

其实最重要的就是完成Lock& Condition逻辑设计,剩下的就是填空了,模板如下

Java并发编程中Lock机制如何实现线程同步?

通过while循环判断队列当前容量是否达到容量上限,如果达到上限就表示队列满了。队列满了(notFull取反使用await),await阻塞生产线程向队列中继续放入数据。在这里,有小伙伴曾经问过我一个奇葩的问题:多线程持有同一个lock锁,你怎么知道阻塞的是生产线程,而不是消费线程呢? 答:一个线程是生产线程还是消费线程,取决于它的动作(调用什么方法),并没有一个标签给它定义死,调用put方法放入数据的就是生产数据的线程。while/await组合是标准写法,请不要随意创新改成if,否则你会遇到很多诡异的bug。

//队列满了,await阻塞生产线程 while (queue.size() >= queueSize) { System.out.println(Thread.currentThread().getName() + "等待,因为队列满了" ); notFull.await(); }

向队列中添加一条数据,此时我们可以确定队列是notEmpty,所以使用notEmpty.signal()向生产者发送信号。这里问题又来了:多线程持有同一个lock锁,你怎么知道通知的是消费者线程,而不是生产者线程呢? 答案是我确实不知道,所以在上文中的while (queue.size() >= queueSize)采用的是while,而不是if。即使生产者线程被唤醒了,while判断也会把它await拦住。

//向队列添加一条消息,同时通知消费者有新消息了 queue.add(message); System.out.println(Thread.currentThread().getName() + "生产" + message ); notEmpty.signal();//通知消费者线程 五、take消费数据

take从队列中取出数据,取出数据之后,队列肯定是notFull ,所以发出notFull.signal信号。当队列空了(notEmpty使用await取反),await同时阻塞消费者线程。

public Object take() throws InterruptedException { Object retVal = null; lock.lock();//操作队列先加锁 try { //队列空了,通知生产线程,消费线程阻塞 while (queue.size() == 0) { System.out.println("队列已经空了,停止消费!"); notEmpty.await(); } //队列删除一条消息,同时通知生产者队列有位置了 retVal = queue.get(0); queue.remove(0); notFull.signal(); //同时通知生产者队列 } finally { lock.unlock(); } return retVal; }

我相信有了上面put方法的基础,理解take方法中的代码,就非常容易了,这里我就不做过多的说明了。

六、生产消费测试

public static void main(String[] args) { //为了方便查看测试结果,我们的队列容量设置小一些 MyBlockingQueue queue = new MyBlockingQueue(2); //生产者线程 new Thread(()->{ for(int i = 0;i < 5;i++){ try { queue.put("msg" + i); //放入5条数据 } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); //消费者线程 new Thread(()->{ while(true){ //一直消费 try { System.out.println(Thread.currentThread().getName() + "消费数据" + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }

输出结果如下,满足我们的需求。队列满了,生产者线程Thread-0等待;生产消费互相协调通知,最终数据消费完成,队列空了,消费者线程阻塞。

Thread-0生产msg0 Thread-0生产msg1 Thread-0等待,因为队列满了 Thread-1消费数据msg0 Thread-0生产msg2 Thread-0等待,因为队列满了 Thread-1消费数据msg1 Thread-0生产msg3 Thread-0等待,因为队列满了 Thread-1消费数据msg2 Thread-0生产msg4 Thread-1消费数据msg3 Thread-1消费数据msg4 队列已经空了,停止消费! 欢迎关注我的博客,更多精品知识合集

本文转载注明出处(必须带连接,不能只转文字):字母哥博客 - zimug.com

觉得对您有帮助的话,帮我点赞、分享!您的支持是我不竭的创作动力!。另外,笔者最近一段时间输出了如下的精品内容,期待您的关注。

  • 《kafka修炼之道》
  • 《手摸手教你学Spring Boot2.0》
  • 《Spring Security-JWT-OAuth2一本通》
  • 《实战前后端分离RBAC权限管理系统》
  • 《实战SpringCloud微服务从青铜到王者》

本文共计2009个文字,预计阅读时间需要9分钟。

Java并发编程中Lock机制如何实现线程同步?

一、协调生产/消费需求本文主要介绍Lock与Condition的使用方法,旨在更好地理解Lock锁与Condition信号量的使用。我们将动手实现一个ArrayBlockingQueue。

二、Lock与Condition的使用Lock与Condition是Java并发编程中的重要工具,用于实现线程间的同步与通信。以下是一些基本的使用方法:

1. Lock接口的使用javaLock lock=new ReentrantLock();lock.lock();try { // 执行需要同步的代码} finally { lock.unlock();}

2. Condition的使用javaLock lock=new ReentrantLock();Condition condition=lock.newCondition();

lock.lock();try { // 等待某个条件 condition.await(); // 执行条件满足后的代码} finally { lock.unlock();}

// 发送信号lock.lock();try { // 条件满足,唤醒一个等待线程 condition.signal(); // 或者唤醒所有等待线程 condition.signalAll();} finally { lock.unlock();}

三、ArrayBlockingQueue的实现为了更好地理解Lock与Condition,我们将实现一个基于Lock与Condition的ArrayBlockingQueue。

javaclass ArrayBlockingQueue { private final T[] items; private int takeIndex; private int putIndex; private final int capacity; private final Lock lock; private final Condition notEmpty; private final Condition notFull;

public ArrayBlockingQueue(int capacity) { this.capacity=capacity; this.items=(T[]) new Object[capacity]; this.lock=new ReentrantLock(); this.notEmpty=lock.newCondition(); this.notFull=lock.newCondition(); }

public void put(T x) throws InterruptedException { lock.lock(); try { while (putIndex==takeIndex) { notFull.await(); } items[putIndex]=x; putIndex=(putIndex + 1) % capacity; notEmpty.signal(); } finally { lock.unlock(); } }

public T take() throws InterruptedException { lock.lock(); try { while (takeIndex==putIndex) { notEmpty.await(); } T x=items[takeIndex]; items[takeIndex]=null; takeIndex=(takeIndex + 1) % capacity; notFull.signal(); return x; } finally { lock.unlock(); } }}

通过以上实现,我们成功地使用Lock与Condition实现了ArrayBlockingQueue。

一、协调生产/消费的需求

本文内容主要想向大家介绍一下Lock结合Condition的使用方法,为了更好的理解Lock锁与Condition锁信号,我们来手写一个ArrayBlockingQueue。 JDK实际上已经有这个类,基于Lock锁与Condition锁信号实现的,当然JDK实现代码很复杂包含了更严谨的逻辑校验,以及从性能优化的角度做了更多的工作。本文中我们只是来简单实现一下其核心逻辑:

  • ArrayBlockingQueue初始化构造时指定容量上限最大值
  • 提供put方法,当达到Queue队列容量上限最大值,阻塞生产数据的线程。
  • put方法生产数据之后,队列肯定是不为空,通知消费者线程进行消费。
  • 提供take方法,当Queue队列容量为0时候,阻塞消费数据的线程。
  • take方法执行之后,队列肯定不是满的,通知生产者线程进行生产。
  • 一条数据只能被take一次,take之后数据从queue中删除

相信实现完成上面的逻辑之后,java并发编程之Lock锁与Condition锁信号,你肯定是掌握了!其实这个逻辑基本上就是kafka生产者客户端缓冲队列,批量进行数据发送的实现逻辑。区别是take方法一次取出缓冲区所有数据,本文take方法一次取出一条数据。

二、构造方法

构造队列的方法很简单,使用一个List作为数据存储队列,并指定其容量。到此我们还没有实现容量判断,以及阻塞线程的功能。

//类成员变量-存储数据的队列 private List<Object> queue; //类成员变量-存储队列的容量上限 private int queueSize; public MyBlockingQueue(int queueSize) { this.queueSize = queueSize; queue = new ArrayList<>(queueSize);//存储消息的集合 } 三、Lock& Condition逻辑设计

首先我们要有一把锁,保证数据put与take操作的同步性,即:一条数据只能被take一次,take之后数据从queue中删除;以及创建Condition逻辑都需要Lock锁。学过java基础并发编程的同学,可以把Lock锁理解为Synchronized 同步代码块功能是一样的。我写过一个专栏《java并发编程》中介绍了二者的区别,欢迎关注。

private Lock lock = new ReentrantLock();//锁

Condition逻辑大家可以理解为传统JDK多线程编程中的wait与notify,但是Condition的语义更容易被理解。如下文代码所示:

private Condition notFull = lock.newCondition(); //队列不为满 notFull.signal(); //通知生产者队列不为满,可以继续生产数据(通常在消费者拿走数据之后,调用) notFull.await(); //队列已满,阻塞生产线程(await对condition逻辑取反)

private Condition notEmpty = lock.newCondition(); //队列不为空 notEmpty.signal(); //通知消费者线程队列不为空,可以继续消费数据(通常在生产者生产数据之后,调用) notEmpty.await(); //队列已经空了,阻塞消费线程(await对condition逻辑取反)

大家在使用Lock& Condition进行线程同步协调的时候,一定像我一样先把condition的逻辑语义设计好

  • 将当xxxx时候的表达,设计为condition。
  • 当情况满足condition的时候发出信号signal()通知其他线程;
  • 当情况与condtion正好相反的的时候,使用await阻塞当前线程。
四、put放入数据

其实最重要的就是完成Lock& Condition逻辑设计,剩下的就是填空了,模板如下

Java并发编程中Lock机制如何实现线程同步?

通过while循环判断队列当前容量是否达到容量上限,如果达到上限就表示队列满了。队列满了(notFull取反使用await),await阻塞生产线程向队列中继续放入数据。在这里,有小伙伴曾经问过我一个奇葩的问题:多线程持有同一个lock锁,你怎么知道阻塞的是生产线程,而不是消费线程呢? 答:一个线程是生产线程还是消费线程,取决于它的动作(调用什么方法),并没有一个标签给它定义死,调用put方法放入数据的就是生产数据的线程。while/await组合是标准写法,请不要随意创新改成if,否则你会遇到很多诡异的bug。

//队列满了,await阻塞生产线程 while (queue.size() >= queueSize) { System.out.println(Thread.currentThread().getName() + "等待,因为队列满了" ); notFull.await(); }

向队列中添加一条数据,此时我们可以确定队列是notEmpty,所以使用notEmpty.signal()向生产者发送信号。这里问题又来了:多线程持有同一个lock锁,你怎么知道通知的是消费者线程,而不是生产者线程呢? 答案是我确实不知道,所以在上文中的while (queue.size() >= queueSize)采用的是while,而不是if。即使生产者线程被唤醒了,while判断也会把它await拦住。

//向队列添加一条消息,同时通知消费者有新消息了 queue.add(message); System.out.println(Thread.currentThread().getName() + "生产" + message ); notEmpty.signal();//通知消费者线程 五、take消费数据

take从队列中取出数据,取出数据之后,队列肯定是notFull ,所以发出notFull.signal信号。当队列空了(notEmpty使用await取反),await同时阻塞消费者线程。

public Object take() throws InterruptedException { Object retVal = null; lock.lock();//操作队列先加锁 try { //队列空了,通知生产线程,消费线程阻塞 while (queue.size() == 0) { System.out.println("队列已经空了,停止消费!"); notEmpty.await(); } //队列删除一条消息,同时通知生产者队列有位置了 retVal = queue.get(0); queue.remove(0); notFull.signal(); //同时通知生产者队列 } finally { lock.unlock(); } return retVal; }

我相信有了上面put方法的基础,理解take方法中的代码,就非常容易了,这里我就不做过多的说明了。

六、生产消费测试

public static void main(String[] args) { //为了方便查看测试结果,我们的队列容量设置小一些 MyBlockingQueue queue = new MyBlockingQueue(2); //生产者线程 new Thread(()->{ for(int i = 0;i < 5;i++){ try { queue.put("msg" + i); //放入5条数据 } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); //消费者线程 new Thread(()->{ while(true){ //一直消费 try { System.out.println(Thread.currentThread().getName() + "消费数据" + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); }

输出结果如下,满足我们的需求。队列满了,生产者线程Thread-0等待;生产消费互相协调通知,最终数据消费完成,队列空了,消费者线程阻塞。

Thread-0生产msg0 Thread-0生产msg1 Thread-0等待,因为队列满了 Thread-1消费数据msg0 Thread-0生产msg2 Thread-0等待,因为队列满了 Thread-1消费数据msg1 Thread-0生产msg3 Thread-0等待,因为队列满了 Thread-1消费数据msg2 Thread-0生产msg4 Thread-1消费数据msg3 Thread-1消费数据msg4 队列已经空了,停止消费! 欢迎关注我的博客,更多精品知识合集

本文转载注明出处(必须带连接,不能只转文字):字母哥博客 - zimug.com

觉得对您有帮助的话,帮我点赞、分享!您的支持是我不竭的创作动力!。另外,笔者最近一段时间输出了如下的精品内容,期待您的关注。

  • 《kafka修炼之道》
  • 《手摸手教你学Spring Boot2.0》
  • 《Spring Security-JWT-OAuth2一本通》
  • 《实战前后端分离RBAC权限管理系统》
  • 《实战SpringCloud微服务从青铜到王者》