探索JUC的神秘领域是怎样的体验?

2026-05-19 18:521阅读0评论SEO问题
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计4960个文字,预计阅读时间需要20分钟。

探索JUC的神秘领域是怎样的体验?

概念 + 同步锁:synchronized、Lock 区别 + 1 + synchronized 是不需要手动解锁的 + 2 + synchronized 可锁定方法、锁同步代码块 + 3 + synchronized 是 Java 自带的关键字 + 4 + Lock 锁是一个类,它具有 synchronized 的所有功能

概念

同步锁:synchronized、Lock区别

1、synchronized是不需要进行手动解锁

2、synchronized可以锁方法、锁同步代码块

3、synchronized是Java自带关键字

4、Lock锁是一个类且它拥有synchronized的所有功能还具备扩展

5、Lock锁的实现类ReentrantLock可以实现公平和非公平锁

6、Lock锁需要手动加锁和手动解锁

7、synchronized不可中断而Lock锁可以实现中断

  • synchronized

    • 当修饰方法时:锁的是方法调用者(this)
    • 当使用static synchronized修饰方法时,锁的是Class对象(类名.class)
    • 也可以使用代码块方式来锁取Class对象(类名.class)
  • Lock : 主要使用到的实现类ReentrantLock(可重入锁)

    • ReentrantLock() -> 非公平锁(默认)(所谓非公平锁既是可以进行插队操作)
    • ReentrantLock(true) -> 公平锁(所谓公平锁就是需要排队,不可以进行插队操作)

集合的线程不安全情况和解决方案

List : ArrayList不安全List,但是在单线程情况下是高效的!

多线程下错误案例

List<Integer> list = new ArrayList<>(); for (int i = 0; i < 30; i++) { final Integer temp = i; new Thread(()->{ list.add(temp); System.out.println(list); }, String.valueOf(temp)).start(); } //结果出现并发修改异常ConcurrentModificationException [0, 1, 2, 3, 5, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 18, 20, 21, 23, 22, 24, 25, 26, 27, 29, 28] //Exception in thread "11" Exception in thread "15" Exception in thread "19" java.util.ConcurrentModificationException

解决方案

//1.使用集合安全类进行转换 List<Integer> list = Collections.synchronizedList(new ArrayList<>()); for (int i = 0; i < 30; i++) { final Integer temp = i; new Thread(()->{ list.add(temp); System.out.println(list); }, String.valueOf(temp)).start(); } //2.使用List对应的Vector List<Integer> list = new Vector<>(); for (int i = 0; i < 30; i++) { final Integer temp = i; new Thread(()->{ list.add(temp); System.out.println(list); }, String.valueOf(temp)).start(); } //3.使用CopyOnWriteArrayList List<Integer> list = new CopyOnWriteArrayList<>(); for (int i = 0; i < 30; i++) { final Integer temp = i; new Thread(()->{ list.add(temp); System.out.println(list); }, String.valueOf(temp)).start(); }

Set集合:HashSet

多线程下错误案例

Set<Integer> set = new HashSet<>(); for (int i = 0; i < 30; i++) { final Integer temp = i; new Thread(()->{ set.add(temp); System.out.println(set); }, String.valueOf(temp)).start(); } // 结果:抛出ConcurrentModificationException并发修改异常!

解决方案:

// 1.使用Collections.synchronizedSet安全集合包装 Set<Integer> set = Collections.synchronizedSet(new HashSet<>()); // 2.使用CopyOnWriteArraySet Set<Integer> set = new CopyOnWriteArraySet<>();

map集合:HashMap

Map<String, Object> map = new HashMap<>(); for (int i = 0; i < 30; i++) { final Integer temp = i; new Thread(()->{ map.put(String.valueOf(temp), temp); System.out.println(map); }, String.valueOf(temp)).start(); } //结果:Exception in thread "6" java.util.ConcurrentModificationException并发修改异常

解决方案:

// 1.ConcurrentHashMap Map<String, Object> map = new ConcurrentHashMap<>(); for (int i = 0; i < 70; i++) { final Integer temp = i; new Thread(()->{ map.put(String.valueOf(temp), temp); System.out.println(map); }, String.valueOf(temp)).start(); } // 2.Hashtable(效率低) 常用线程辅助类

CountDownLatch(减法计数器)

CountDownLatch countDownLatch = new CountDownLatch(10); // 传入一个数字,要执行多少次 countDownLatch.countDown(); // 每次执行完一个任务后,进行减1操作 countDownLatch.await(); // 等待计数器归零,只有等上面执行次数完毕后,才能执行后面的操作

CyclicBarrier(加法计数器)

CyclicBarrier cyclicBarrier = new CyclicBarrier(10); // 初始化计数器容量,默认构造Runnable为null // 当计数器到达10的时候,就执行Runnable里面的具体操作 CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Runnable() { @Override public void run() { System.out.println("executor other thing!"); } }); cyclicBarrier.await(); // 等待计数器到达初始化计数器值,然后才能执行下面操作!

栗子:

//初始化cyclicBarrier加法计数器 CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Runnable() { @Override public void run() { System.out.println("执行到第十个啦,完结!"); } }); for (int i = 1; i <= 10; i++) { int u = i; new Thread(()->{ try { System.out.println("执行到第" + u +"个了, 还剩" + (10 - u) + "个"); //每执行完一个线程就进行加一操作!当执行完第十个就触发cyclicBarrier初始化中的Runnable接口实现 cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); }

Semaphore(信号量)

Semaphore : 一般用于限流情况

semaphore.acquire():获得线程使用权限

semaphore.release():释放线程使用权限

Semaphore semaphore = new Semaphore(2); for (int i = 1; i <= 4; i++) { new Thread(() -> { try { // 得到线程执行权限,当线程数到达了信号量初始化容量,其他线程就会等待(阻塞)当前线程执行完毕并释放执行权限才可继续执行! semaphore.acquire(); System.out.println("当前线程:" + Thread.currentThread().getName() + "开始执行..."); TimeUnit.SECONDS.sleep(2); System.out.println("当前线程:" + Thread.currentThread().getName() + "执行完毕"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); //释放线程执行权限 } }, String.valueOf(i)).start(); } } // 结果 当前线程:1开始执行... 当前线程:2开始执行... 当前线程:1执行完毕 当前线程:2执行完毕 // 到达信号量最大容量,其他线程就进行等待(阻塞) 当前线程:3开始执行... 当前线程:4开始执行... 当前线程:3执行完毕 当前线程:4执行完毕 读写锁

ReadWriteLock

主要使用到:ReentrantReadWriteLock(实现类)

概念

  • 读写锁共存

    • 读 -> 读 可以共存
    • 读 -> 写 不能共存(不能边修改边读取,就会出现读取的数据不正确情况)
    • 写 -> 写 不能共存(可能出现一个线程正在修改原来的值,另一个线程也在修改原来的值,出现两个线程修改后,最后读取的数据不是自己修改的数据)
  • 独占/共享锁

    • 独占锁:也就是写锁,同一时刻只能有一个线程可以对数据进行写的操作
    • 共享锁:也就是读锁,同一时刻可以出现多个线程对数据进行读取的操作,且读取的数据都是同一份数据

//开启两个读写线程,分别进行写和读操作 for (int i = 0; i < 5; i++) { final Integer temp = i; new Thread(()->{ mapDemo.put(String.valueOf(temp), temp + 10000); }, "线程->" + String.valueOf(temp)).start(); } for (int i = 5; i < 10; i++) { final Integer temp = i; new Thread(()->{ mapDemo.get(String.valueOf(temp)); }, "线程->" + String.valueOf(temp)).start(); } // 初始化读写锁 private volatile Map<String, Object> map = new ConcurrentHashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public void put(String key, Object value) { //写入加锁 readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "开始写入....."); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入完毕....."); }finally { //写完释放锁 readWriteLock.writeLock().unlock(); } } public Object get(String key) { //读取加锁 readWriteLock.readLock().lock(); Object object = null; try { System.out.println(Thread.currentThread().getName() + "开始读取----------->"); object = map.get(key); System.out.println(Thread.currentThread().getName() + "读取完成----------->"); }finally { //读取解锁 readWriteLock.readLock().unlock(); } return object; } // 运行结果:发现写入的时候,总是只有一个线程可以在同一时间进行写入,而读取可以多个线程同时读取 线程->1开始写入..... 线程->1写入完毕..... 线程->0开始写入..... 线程->0写入完毕..... 线程->3开始写入..... 线程->3写入完毕..... 线程->2开始写入..... 线程->2写入完毕..... 线程->4开始写入..... 线程->4写入完毕..... 线程->5开始读取-----------> 线程->5读取完成-----------> 线程->7开始读取-----------> 线程->8开始读取-----------> 线程->8读取完成-----------> 线程->6开始读取-----------> 线程->9开始读取-----------> 线程->9读取完成-----------> 线程->7读取完成-----------> 线程->6读取完成-----------> 阻塞队列

  • ArrayBlockingQueue
    • add()与offer()区别:add在超出容量时会抛出异常,而offer则不会抛出异常,而是拒绝添加到队列中!
    • 移除区别(remove()与poll()区别):当队列中无元素时,remove会抛出异常,而poll则是返回null
    • 查看队首(element()与peek()区别):当队列为空时,element会抛出异常,而peek

ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4); arrayBlockingQueue.add("A"); arrayBlockingQueue.add("B"); arrayBlockingQueue.add("C"); arrayBlockingQueue.add("D"); // arrayBlockingQueue.add("E"); System.out.println(arrayBlockingQueue); //结果: [A, B, C, D] /** 注意: 1. 当元素超过队列的容量时,就会抛出异常java.lang.IllegalStateException: Queue full 2. 当添加null时,抛出空指针异常 java.lang.NullPointerException 3.使用offer代替add使用 */ //1.错误案例:容量为4,但是添加了五个元素 ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4); arrayBlockingQueue.add("A"); arrayBlockingQueue.add("B"); arrayBlockingQueue.add("C"); arrayBlockingQueue.add("D"); arrayBlockingQueue.add("E"); System.out.println(arrayBlockingQueue); //结果: Exception in thread "main" java.lang.IllegalStateException: Queue full //2.错误案例:添加null数据,抛出空指针异常 ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4); arrayBlockingQueue.add("A"); arrayBlockingQueue.add("B"); arrayBlockingQueue.add("C"); arrayBlockingQueue.add(null); System.out.println(arrayBlockingQueue); //结果: Exception in thread "main" java.lang.NullPointerException //3.使用offer代替add添加元素 ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4); arrayBlockingQueue.offer("A"); arrayBlockingQueue.offer("B"); arrayBlockingQueue.offer("C"); arrayBlockingQueue.offer("D"); arrayBlockingQueue.offer("E"); System.out.println(arrayBlockingQueue); //add与offer区别:add在超出容量时会抛出异常,而offer则不会抛出异常,而是拒绝添加到队列中! //结果: [A, B, C, D]

  • ArrayBlockingQueue 延迟等待

    //延迟添加等待----------------> offer() ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4); System.out.println(arrayBlockingQueue.offer("A")); System.out.println(arrayBlockingQueue.offer("B")); System.out.println(arrayBlockingQueue.offer("C")); System.out.println(arrayBlockingQueue.offer("D")); //延迟12秒添加,如果队列已满就返回false(表示添加失败) System.out.println(arrayBlockingQueue.offer("E", 12, TimeUnit.SECONDS)); System.out.println(arrayBlockingQueue); //结果: true true true true false [A, B, C, D] // 延迟取出等待-------------> poll() ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4); arrayBlockingQueue.offer("A"); arrayBlockingQueue.offer("B"); arrayBlockingQueue.offer("C"); arrayBlockingQueue.offer("D"); arrayBlockingQueue.offer("E", 2, TimeUnit.SECONDS); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll(2, TimeUnit.SECONDS)); // 结果: A B C D //延迟等待2秒钟再弹出,如果队列为空,就返回null null

  • 同步队列(SynchronousQueue)

    • 特性:只能存储一个对象/值,当存入之后必须等待取出之后才能进行再次存入

栗子:

new Thread(()->{ try { synchronousQueue.put(1); System.out.println(Thread.currentThread().getName() + ":put " + 1); synchronousQueue.put(2); System.out.println(Thread.currentThread().getName() + ":put " + 2); synchronousQueue.put(3); System.out.println(Thread.currentThread().getName() + ":put " + 3); } catch (InterruptedException e) { e.printStackTrace(); } } , "put线程:").start(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName() + ":take -> " + synchronousQueue.take()); System.out.println(Thread.currentThread().getName() + ":take -> " + synchronousQueue.take()); System.out.println(Thread.currentThread().getName() + ":take -> " + synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } , "take线程:").start(); //结果: put线程::put 1 take线程::take -> 1 put线程::put 2 take线程::take -> 2 put线程::put 3 take线程::take -> 3 线程池

  • 线程复用(节约了系统资源)

    探索JUC的神秘领域是怎样的体验?

  • 控制最大并发数(当达到线程池容量,就需要等待其他线程完成,才能继续进入)

  • 管理线程

  • Executors线程池

ExecutorService executorService = Executors.newSingleThreadExecutor(); // 单个线程的池子 ExecutorService executorService = Executors.newFixedThreadPool(10); //开启十个固定线程的池子 ExecutorService executorService = Executors.newCachedThreadPool(); //可伸缩线程池, 如果线程池中线程已全被使用就创建新的线程池

newSingleThreadExecutor

ExecutorService executorService = Executors.newSingleThreadExecutor(); // 单个线程的池子 try { for (int i1 = 0; i1 < 10; i1++) { //执行线程 executorService.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 执行了线程.."); } }); } } catch (Exception e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //结果:只有一个线程在重复利用执行 pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程..

newFixedThreadPool

ExecutorService executorService = Executors.newFixedThreadPool(5); //开启十个固定线程的池子 try { for (int i = 0; i < 10; i++) { //执行线程 executorService.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 执行了线程.."); } }); } } catch (Exception e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //结果:五个不同的线程重复使用 pool-1-thread-4 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-4 执行了线程.. pool-1-thread-3 执行了线程.. pool-1-thread-2 执行了线程.. pool-1-thread-5 执行了线程.. pool-1-thread-2 执行了线程.. pool-1-thread-3 执行了线程.. pool-1-thread-4 执行了线程.. pool-1-thread-1 执行了线程..

newCachedThreadPool

ExecutorService executorService = Executors.newCachedThreadPool(); //可伸缩线程池, 如果线程池中线程已全被使用就创建新的线程池 try { for (int i = 0; i < 10; i++) { //开启线程 executorService.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 执行了线程.."); } }); } } catch (Exception e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //结果:开启新线程,当已开启的线程执行完毕,放入池子中又可以进行使用,如果开启的线程都还在执行中,就创建新的线程 pool-1-thread-1 执行了线程.. pool-1-thread-6 执行了线程.. pool-1-thread-5 执行了线程.. pool-1-thread-3 执行了线程.. pool-1-thread-4 执行了线程.. pool-1-thread-2 执行了线程.. pool-1-thread-8 执行了线程.. pool-1-thread-7 执行了线程.. pool-1-thread-9 执行了线程.. pool-1-thread-10 执行了线程..

线程池参数

  • 7大参数

public ThreadPoolExecutor(int corePoolSize, //核心线程数 int maximumPoolSize, //最大线程数 long keepAliveTime, //线程存活时间 TimeUnit unit, //线程时间单元 BlockingQueue<Runnable> workQueue) { //阻塞队列 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory() //默认线程工厂 , defaultHandler); //线程拒绝策略(当达到了最大线程数时,采用线程拒绝策略) }

Spring自带的任务执行器线程池

@Bean("scheduledTaskExecutor") public ThreadPoolTaskExecutor scheduledTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //创建任务执行器线程池 executor.setCorePoolSize(3);//设置核心线程数 executor.setMaxPoolSize(5); //设置最大线程数 executor.setQueueCapacity(1024*100); //设置一个队列容量 executor.setThreadNamePrefix("parking-index-task"); //线程名称 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略 executor.initialize(); //初始化线程池 return executor; }

线程四大拒绝策略应用场景

AbortPolicy: 当队列中线程已满,就抛出异常 DiscardPolicy:当队列满了,就丢弃任务,不会抛出异常 CallerRunsPolicy: 队列已满时,就使用调用者的线程去执行,当处理器关闭就丢弃此线程需求 DiscardOldestPolicy:当队列满了,去尝试和较早的线程竞争,当最早的线程即将执行完成就把当前任务使用即将完成的线程执行 源码解释: AbortPolicy: public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always 解释:总是把RejectedExecutionException。Params: r—请求执行的可运行任务e—尝试执行该任务的执行器抛出:RejectedExecutionException—always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } DiscardPolicy: public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task 解释:什么都不做,这有丢弃任务r的效果。参数:r -请求被执行的可运行任务e -试图执行该任务的执行程序 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } CallerRunsPolicy: public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task 解释:在调用者的线程中执行任务r,除非执行器已经关闭,在这种情况下,任务将被丢弃。参数:r—请求执行的可运行任务e—尝试执行该任务的执行程序 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } DiscardOldestPolicy: public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task 解释:获取并忽略执行器将执行的下一个任务(如果有一个任务立即可用),然后重试执行任务r,除非执行器被关闭,在这种情况下,任务r将被丢弃。参数:r—请求执行的可运行任务e—尝试执行该任务的执行程序 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); }

Cpu密集型

int availableProcessors = Runtime.getRuntime().availableProcessors(); //获取Cpu核数,适合设置核心线程池的大小

IO密集型

int availableProcessors = Runtime.getRuntime().availableProcessors(); // int maximumPoolSize = availableProcessors * 2; // Io密集型一般设置为Cpu核数的两倍,防止 ForkJoin

  • 任务拆分

public class DoMain extends RecursiveTask<Long> { private Long start; private Long end; private final Long threshold = 10_0000_0000L; public DoMain(Long start, Long end) { this.start = start; this.end = end; } /** 递归分解大数据,每次进行两段两段操作 */ @Override protected Long compute() { Long res = 0L; if ((end - start) > threshold) { Long middle = (end + start) / 2; //分两次进行计算 ForkJoinTask<Long> fork1 = new DoMain(start, middle).fork(); Long res1 = fork1.join(); ForkJoinTask<Long> fork2 = new DoMain(middle, end).fork(); Long res2 = fork2.join(); res = res1 + res2; } else { for (Long i = start; i < end; i++) { res += i; } } return res; } } //这样创建线程不规范,这里只是简易操作! new Thread(() -> { long l = System.currentTimeMillis(); DoMain doMain = new DoMain(0L, 500_0000_0000L); ForkJoinTask<Long> submit = new ForkJoinPool().submit(doMain); try { System.out.println("forkJoin输出结果:" + submit.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("forkJoin所用时间: " + (System.currentTimeMillis() - l)); }).start(); //这样创建线程不规范,这里只是简易操作! new Thread(() -> { long start = System.currentTimeMillis(); Long res = 0L; for (Long i = 0L; i < 500_0000_0000L; i++) { res += i; } System.out.println("普通循环输出结果:" + res); System.out.println("普通所用时间" + (System.currentTimeMillis() - start)); }).start();

计算对比

stream环输出结果: 124999999750000000
stream所用时间2304
普通循环输出结果:124999999750000000
普通所用时间14116
forkJoin输出结果:124999999750000000
forkJoin所用时间: 14468

stream流计算

new Thread(() -> { long start = System.currentTimeMillis(); long longStream = LongStream.range(0L, 5_0000_0000L).parallel().reduce(0L, Long::sum); System.out.println("stream环输出结果: " + longStream); System.out.println("stream所用时间" + (System.currentTimeMillis() - start)); }).start(); volatile

  • 保证了可见性
  • 不保证原子性(也就是多线程情况下,无法保证同一个值被多个线程修改)
  • 保证了禁止指令重排(当程序启动时,它可能并不是按照我们代码的顺序执行,比如初始化,可能就不是按照我们写的代码步骤来的,这就是指令重排,保证指令不重排就可以使用volatile关键字进行声明)

/** * 1.使用volatile禁止指令重排 * 2. 使用AtomicInteger原子类保证是原子操作 */ public static volatile AtomicInteger num = new AtomicInteger(); public static void main(String[] args) { for (int i = 0; i < 20; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) { //进行加一操作 num.getAndIncrement(); } }).start(); } //当线程数大于2时,暂停main线程,让给其他线程执行 while (Thread.activeCount() > 2) { Thread.yield(); } System.out.println(num);

原子类操作源码

public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } //原子类底层代码,使用了CAS(比较替换算法,也就是自旋锁) // compareAndSwapInt底层是调用c++操作内存,对应的是native关键字 public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } CAS简单实现

栗子:

@SneakyThrows public static void main(String[] args) { CasLock casLock = new CasLock(); new Thread(()->{ try { casLock.lock(); }catch (Exception e) { e.printStackTrace(); }finally { casLock.unLock(); } }, "Thread1").start(); TimeUnit.SECONDS.sleep(2); new Thread(()->{ try { casLock.lock(); }catch (Exception e) { e.printStackTrace(); }finally { casLock.unLock(); } }, "Thread2").start(); } public static class CasLock { AtomicReference<Thread> lock = new AtomicReference<>(); public void lock() { Thread thread = Thread.currentThread(); if (lock.get() == null) { //拿到泛型中Thread的值进行比较 System.out.println(thread.getName() + "----> 开始自旋..."); } while (lock.compareAndSet(null, thread)) { } } public void unLock() { Thread thread = Thread.currentThread(); System.out.println(thread.getName() + "----> 解锁成功!"); //解锁 lock.compareAndSet(thread, null); } }

本文共计4960个文字,预计阅读时间需要20分钟。

探索JUC的神秘领域是怎样的体验?

概念 + 同步锁:synchronized、Lock 区别 + 1 + synchronized 是不需要手动解锁的 + 2 + synchronized 可锁定方法、锁同步代码块 + 3 + synchronized 是 Java 自带的关键字 + 4 + Lock 锁是一个类,它具有 synchronized 的所有功能

概念

同步锁:synchronized、Lock区别

1、synchronized是不需要进行手动解锁

2、synchronized可以锁方法、锁同步代码块

3、synchronized是Java自带关键字

4、Lock锁是一个类且它拥有synchronized的所有功能还具备扩展

5、Lock锁的实现类ReentrantLock可以实现公平和非公平锁

6、Lock锁需要手动加锁和手动解锁

7、synchronized不可中断而Lock锁可以实现中断

  • synchronized

    • 当修饰方法时:锁的是方法调用者(this)
    • 当使用static synchronized修饰方法时,锁的是Class对象(类名.class)
    • 也可以使用代码块方式来锁取Class对象(类名.class)
  • Lock : 主要使用到的实现类ReentrantLock(可重入锁)

    • ReentrantLock() -> 非公平锁(默认)(所谓非公平锁既是可以进行插队操作)
    • ReentrantLock(true) -> 公平锁(所谓公平锁就是需要排队,不可以进行插队操作)

集合的线程不安全情况和解决方案

List : ArrayList不安全List,但是在单线程情况下是高效的!

多线程下错误案例

List<Integer> list = new ArrayList<>(); for (int i = 0; i < 30; i++) { final Integer temp = i; new Thread(()->{ list.add(temp); System.out.println(list); }, String.valueOf(temp)).start(); } //结果出现并发修改异常ConcurrentModificationException [0, 1, 2, 3, 5, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 18, 20, 21, 23, 22, 24, 25, 26, 27, 29, 28] //Exception in thread "11" Exception in thread "15" Exception in thread "19" java.util.ConcurrentModificationException

解决方案

//1.使用集合安全类进行转换 List<Integer> list = Collections.synchronizedList(new ArrayList<>()); for (int i = 0; i < 30; i++) { final Integer temp = i; new Thread(()->{ list.add(temp); System.out.println(list); }, String.valueOf(temp)).start(); } //2.使用List对应的Vector List<Integer> list = new Vector<>(); for (int i = 0; i < 30; i++) { final Integer temp = i; new Thread(()->{ list.add(temp); System.out.println(list); }, String.valueOf(temp)).start(); } //3.使用CopyOnWriteArrayList List<Integer> list = new CopyOnWriteArrayList<>(); for (int i = 0; i < 30; i++) { final Integer temp = i; new Thread(()->{ list.add(temp); System.out.println(list); }, String.valueOf(temp)).start(); }

Set集合:HashSet

多线程下错误案例

Set<Integer> set = new HashSet<>(); for (int i = 0; i < 30; i++) { final Integer temp = i; new Thread(()->{ set.add(temp); System.out.println(set); }, String.valueOf(temp)).start(); } // 结果:抛出ConcurrentModificationException并发修改异常!

解决方案:

// 1.使用Collections.synchronizedSet安全集合包装 Set<Integer> set = Collections.synchronizedSet(new HashSet<>()); // 2.使用CopyOnWriteArraySet Set<Integer> set = new CopyOnWriteArraySet<>();

map集合:HashMap

Map<String, Object> map = new HashMap<>(); for (int i = 0; i < 30; i++) { final Integer temp = i; new Thread(()->{ map.put(String.valueOf(temp), temp); System.out.println(map); }, String.valueOf(temp)).start(); } //结果:Exception in thread "6" java.util.ConcurrentModificationException并发修改异常

解决方案:

// 1.ConcurrentHashMap Map<String, Object> map = new ConcurrentHashMap<>(); for (int i = 0; i < 70; i++) { final Integer temp = i; new Thread(()->{ map.put(String.valueOf(temp), temp); System.out.println(map); }, String.valueOf(temp)).start(); } // 2.Hashtable(效率低) 常用线程辅助类

CountDownLatch(减法计数器)

CountDownLatch countDownLatch = new CountDownLatch(10); // 传入一个数字,要执行多少次 countDownLatch.countDown(); // 每次执行完一个任务后,进行减1操作 countDownLatch.await(); // 等待计数器归零,只有等上面执行次数完毕后,才能执行后面的操作

CyclicBarrier(加法计数器)

CyclicBarrier cyclicBarrier = new CyclicBarrier(10); // 初始化计数器容量,默认构造Runnable为null // 当计数器到达10的时候,就执行Runnable里面的具体操作 CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Runnable() { @Override public void run() { System.out.println("executor other thing!"); } }); cyclicBarrier.await(); // 等待计数器到达初始化计数器值,然后才能执行下面操作!

栗子:

//初始化cyclicBarrier加法计数器 CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Runnable() { @Override public void run() { System.out.println("执行到第十个啦,完结!"); } }); for (int i = 1; i <= 10; i++) { int u = i; new Thread(()->{ try { System.out.println("执行到第" + u +"个了, 还剩" + (10 - u) + "个"); //每执行完一个线程就进行加一操作!当执行完第十个就触发cyclicBarrier初始化中的Runnable接口实现 cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); }

Semaphore(信号量)

Semaphore : 一般用于限流情况

semaphore.acquire():获得线程使用权限

semaphore.release():释放线程使用权限

Semaphore semaphore = new Semaphore(2); for (int i = 1; i <= 4; i++) { new Thread(() -> { try { // 得到线程执行权限,当线程数到达了信号量初始化容量,其他线程就会等待(阻塞)当前线程执行完毕并释放执行权限才可继续执行! semaphore.acquire(); System.out.println("当前线程:" + Thread.currentThread().getName() + "开始执行..."); TimeUnit.SECONDS.sleep(2); System.out.println("当前线程:" + Thread.currentThread().getName() + "执行完毕"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); //释放线程执行权限 } }, String.valueOf(i)).start(); } } // 结果 当前线程:1开始执行... 当前线程:2开始执行... 当前线程:1执行完毕 当前线程:2执行完毕 // 到达信号量最大容量,其他线程就进行等待(阻塞) 当前线程:3开始执行... 当前线程:4开始执行... 当前线程:3执行完毕 当前线程:4执行完毕 读写锁

ReadWriteLock

主要使用到:ReentrantReadWriteLock(实现类)

概念

  • 读写锁共存

    • 读 -> 读 可以共存
    • 读 -> 写 不能共存(不能边修改边读取,就会出现读取的数据不正确情况)
    • 写 -> 写 不能共存(可能出现一个线程正在修改原来的值,另一个线程也在修改原来的值,出现两个线程修改后,最后读取的数据不是自己修改的数据)
  • 独占/共享锁

    • 独占锁:也就是写锁,同一时刻只能有一个线程可以对数据进行写的操作
    • 共享锁:也就是读锁,同一时刻可以出现多个线程对数据进行读取的操作,且读取的数据都是同一份数据

//开启两个读写线程,分别进行写和读操作 for (int i = 0; i < 5; i++) { final Integer temp = i; new Thread(()->{ mapDemo.put(String.valueOf(temp), temp + 10000); }, "线程->" + String.valueOf(temp)).start(); } for (int i = 5; i < 10; i++) { final Integer temp = i; new Thread(()->{ mapDemo.get(String.valueOf(temp)); }, "线程->" + String.valueOf(temp)).start(); } // 初始化读写锁 private volatile Map<String, Object> map = new ConcurrentHashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public void put(String key, Object value) { //写入加锁 readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "开始写入....."); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入完毕....."); }finally { //写完释放锁 readWriteLock.writeLock().unlock(); } } public Object get(String key) { //读取加锁 readWriteLock.readLock().lock(); Object object = null; try { System.out.println(Thread.currentThread().getName() + "开始读取----------->"); object = map.get(key); System.out.println(Thread.currentThread().getName() + "读取完成----------->"); }finally { //读取解锁 readWriteLock.readLock().unlock(); } return object; } // 运行结果:发现写入的时候,总是只有一个线程可以在同一时间进行写入,而读取可以多个线程同时读取 线程->1开始写入..... 线程->1写入完毕..... 线程->0开始写入..... 线程->0写入完毕..... 线程->3开始写入..... 线程->3写入完毕..... 线程->2开始写入..... 线程->2写入完毕..... 线程->4开始写入..... 线程->4写入完毕..... 线程->5开始读取-----------> 线程->5读取完成-----------> 线程->7开始读取-----------> 线程->8开始读取-----------> 线程->8读取完成-----------> 线程->6开始读取-----------> 线程->9开始读取-----------> 线程->9读取完成-----------> 线程->7读取完成-----------> 线程->6读取完成-----------> 阻塞队列

  • ArrayBlockingQueue
    • add()与offer()区别:add在超出容量时会抛出异常,而offer则不会抛出异常,而是拒绝添加到队列中!
    • 移除区别(remove()与poll()区别):当队列中无元素时,remove会抛出异常,而poll则是返回null
    • 查看队首(element()与peek()区别):当队列为空时,element会抛出异常,而peek

ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4); arrayBlockingQueue.add("A"); arrayBlockingQueue.add("B"); arrayBlockingQueue.add("C"); arrayBlockingQueue.add("D"); // arrayBlockingQueue.add("E"); System.out.println(arrayBlockingQueue); //结果: [A, B, C, D] /** 注意: 1. 当元素超过队列的容量时,就会抛出异常java.lang.IllegalStateException: Queue full 2. 当添加null时,抛出空指针异常 java.lang.NullPointerException 3.使用offer代替add使用 */ //1.错误案例:容量为4,但是添加了五个元素 ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4); arrayBlockingQueue.add("A"); arrayBlockingQueue.add("B"); arrayBlockingQueue.add("C"); arrayBlockingQueue.add("D"); arrayBlockingQueue.add("E"); System.out.println(arrayBlockingQueue); //结果: Exception in thread "main" java.lang.IllegalStateException: Queue full //2.错误案例:添加null数据,抛出空指针异常 ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4); arrayBlockingQueue.add("A"); arrayBlockingQueue.add("B"); arrayBlockingQueue.add("C"); arrayBlockingQueue.add(null); System.out.println(arrayBlockingQueue); //结果: Exception in thread "main" java.lang.NullPointerException //3.使用offer代替add添加元素 ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4); arrayBlockingQueue.offer("A"); arrayBlockingQueue.offer("B"); arrayBlockingQueue.offer("C"); arrayBlockingQueue.offer("D"); arrayBlockingQueue.offer("E"); System.out.println(arrayBlockingQueue); //add与offer区别:add在超出容量时会抛出异常,而offer则不会抛出异常,而是拒绝添加到队列中! //结果: [A, B, C, D]

  • ArrayBlockingQueue 延迟等待

    //延迟添加等待----------------> offer() ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4); System.out.println(arrayBlockingQueue.offer("A")); System.out.println(arrayBlockingQueue.offer("B")); System.out.println(arrayBlockingQueue.offer("C")); System.out.println(arrayBlockingQueue.offer("D")); //延迟12秒添加,如果队列已满就返回false(表示添加失败) System.out.println(arrayBlockingQueue.offer("E", 12, TimeUnit.SECONDS)); System.out.println(arrayBlockingQueue); //结果: true true true true false [A, B, C, D] // 延迟取出等待-------------> poll() ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(4); arrayBlockingQueue.offer("A"); arrayBlockingQueue.offer("B"); arrayBlockingQueue.offer("C"); arrayBlockingQueue.offer("D"); arrayBlockingQueue.offer("E", 2, TimeUnit.SECONDS); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll()); System.out.println(arrayBlockingQueue.poll(2, TimeUnit.SECONDS)); // 结果: A B C D //延迟等待2秒钟再弹出,如果队列为空,就返回null null

  • 同步队列(SynchronousQueue)

    • 特性:只能存储一个对象/值,当存入之后必须等待取出之后才能进行再次存入

栗子:

new Thread(()->{ try { synchronousQueue.put(1); System.out.println(Thread.currentThread().getName() + ":put " + 1); synchronousQueue.put(2); System.out.println(Thread.currentThread().getName() + ":put " + 2); synchronousQueue.put(3); System.out.println(Thread.currentThread().getName() + ":put " + 3); } catch (InterruptedException e) { e.printStackTrace(); } } , "put线程:").start(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName() + ":take -> " + synchronousQueue.take()); System.out.println(Thread.currentThread().getName() + ":take -> " + synchronousQueue.take()); System.out.println(Thread.currentThread().getName() + ":take -> " + synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } , "take线程:").start(); //结果: put线程::put 1 take线程::take -> 1 put线程::put 2 take线程::take -> 2 put线程::put 3 take线程::take -> 3 线程池

  • 线程复用(节约了系统资源)

    探索JUC的神秘领域是怎样的体验?

  • 控制最大并发数(当达到线程池容量,就需要等待其他线程完成,才能继续进入)

  • 管理线程

  • Executors线程池

ExecutorService executorService = Executors.newSingleThreadExecutor(); // 单个线程的池子 ExecutorService executorService = Executors.newFixedThreadPool(10); //开启十个固定线程的池子 ExecutorService executorService = Executors.newCachedThreadPool(); //可伸缩线程池, 如果线程池中线程已全被使用就创建新的线程池

newSingleThreadExecutor

ExecutorService executorService = Executors.newSingleThreadExecutor(); // 单个线程的池子 try { for (int i1 = 0; i1 < 10; i1++) { //执行线程 executorService.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 执行了线程.."); } }); } } catch (Exception e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //结果:只有一个线程在重复利用执行 pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-1 执行了线程..

newFixedThreadPool

ExecutorService executorService = Executors.newFixedThreadPool(5); //开启十个固定线程的池子 try { for (int i = 0; i < 10; i++) { //执行线程 executorService.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 执行了线程.."); } }); } } catch (Exception e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //结果:五个不同的线程重复使用 pool-1-thread-4 执行了线程.. pool-1-thread-1 执行了线程.. pool-1-thread-4 执行了线程.. pool-1-thread-3 执行了线程.. pool-1-thread-2 执行了线程.. pool-1-thread-5 执行了线程.. pool-1-thread-2 执行了线程.. pool-1-thread-3 执行了线程.. pool-1-thread-4 执行了线程.. pool-1-thread-1 执行了线程..

newCachedThreadPool

ExecutorService executorService = Executors.newCachedThreadPool(); //可伸缩线程池, 如果线程池中线程已全被使用就创建新的线程池 try { for (int i = 0; i < 10; i++) { //开启线程 executorService.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 执行了线程.."); } }); } } catch (Exception e) { e.printStackTrace(); } finally { //关闭线程池 executorService.shutdown(); } //结果:开启新线程,当已开启的线程执行完毕,放入池子中又可以进行使用,如果开启的线程都还在执行中,就创建新的线程 pool-1-thread-1 执行了线程.. pool-1-thread-6 执行了线程.. pool-1-thread-5 执行了线程.. pool-1-thread-3 执行了线程.. pool-1-thread-4 执行了线程.. pool-1-thread-2 执行了线程.. pool-1-thread-8 执行了线程.. pool-1-thread-7 执行了线程.. pool-1-thread-9 执行了线程.. pool-1-thread-10 执行了线程..

线程池参数

  • 7大参数

public ThreadPoolExecutor(int corePoolSize, //核心线程数 int maximumPoolSize, //最大线程数 long keepAliveTime, //线程存活时间 TimeUnit unit, //线程时间单元 BlockingQueue<Runnable> workQueue) { //阻塞队列 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory() //默认线程工厂 , defaultHandler); //线程拒绝策略(当达到了最大线程数时,采用线程拒绝策略) }

Spring自带的任务执行器线程池

@Bean("scheduledTaskExecutor") public ThreadPoolTaskExecutor scheduledTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //创建任务执行器线程池 executor.setCorePoolSize(3);//设置核心线程数 executor.setMaxPoolSize(5); //设置最大线程数 executor.setQueueCapacity(1024*100); //设置一个队列容量 executor.setThreadNamePrefix("parking-index-task"); //线程名称 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略 executor.initialize(); //初始化线程池 return executor; }

线程四大拒绝策略应用场景

AbortPolicy: 当队列中线程已满,就抛出异常 DiscardPolicy:当队列满了,就丢弃任务,不会抛出异常 CallerRunsPolicy: 队列已满时,就使用调用者的线程去执行,当处理器关闭就丢弃此线程需求 DiscardOldestPolicy:当队列满了,去尝试和较早的线程竞争,当最早的线程即将执行完成就把当前任务使用即将完成的线程执行 源码解释: AbortPolicy: public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always 解释:总是把RejectedExecutionException。Params: r—请求执行的可运行任务e—尝试执行该任务的执行器抛出:RejectedExecutionException—always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } DiscardPolicy: public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task 解释:什么都不做,这有丢弃任务r的效果。参数:r -请求被执行的可运行任务e -试图执行该任务的执行程序 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } CallerRunsPolicy: public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task 解释:在调用者的线程中执行任务r,除非执行器已经关闭,在这种情况下,任务将被丢弃。参数:r—请求执行的可运行任务e—尝试执行该任务的执行程序 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } DiscardOldestPolicy: public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task 解释:获取并忽略执行器将执行的下一个任务(如果有一个任务立即可用),然后重试执行任务r,除非执行器被关闭,在这种情况下,任务r将被丢弃。参数:r—请求执行的可运行任务e—尝试执行该任务的执行程序 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); }

Cpu密集型

int availableProcessors = Runtime.getRuntime().availableProcessors(); //获取Cpu核数,适合设置核心线程池的大小

IO密集型

int availableProcessors = Runtime.getRuntime().availableProcessors(); // int maximumPoolSize = availableProcessors * 2; // Io密集型一般设置为Cpu核数的两倍,防止 ForkJoin

  • 任务拆分

public class DoMain extends RecursiveTask<Long> { private Long start; private Long end; private final Long threshold = 10_0000_0000L; public DoMain(Long start, Long end) { this.start = start; this.end = end; } /** 递归分解大数据,每次进行两段两段操作 */ @Override protected Long compute() { Long res = 0L; if ((end - start) > threshold) { Long middle = (end + start) / 2; //分两次进行计算 ForkJoinTask<Long> fork1 = new DoMain(start, middle).fork(); Long res1 = fork1.join(); ForkJoinTask<Long> fork2 = new DoMain(middle, end).fork(); Long res2 = fork2.join(); res = res1 + res2; } else { for (Long i = start; i < end; i++) { res += i; } } return res; } } //这样创建线程不规范,这里只是简易操作! new Thread(() -> { long l = System.currentTimeMillis(); DoMain doMain = new DoMain(0L, 500_0000_0000L); ForkJoinTask<Long> submit = new ForkJoinPool().submit(doMain); try { System.out.println("forkJoin输出结果:" + submit.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("forkJoin所用时间: " + (System.currentTimeMillis() - l)); }).start(); //这样创建线程不规范,这里只是简易操作! new Thread(() -> { long start = System.currentTimeMillis(); Long res = 0L; for (Long i = 0L; i < 500_0000_0000L; i++) { res += i; } System.out.println("普通循环输出结果:" + res); System.out.println("普通所用时间" + (System.currentTimeMillis() - start)); }).start();

计算对比

stream环输出结果: 124999999750000000
stream所用时间2304
普通循环输出结果:124999999750000000
普通所用时间14116
forkJoin输出结果:124999999750000000
forkJoin所用时间: 14468

stream流计算

new Thread(() -> { long start = System.currentTimeMillis(); long longStream = LongStream.range(0L, 5_0000_0000L).parallel().reduce(0L, Long::sum); System.out.println("stream环输出结果: " + longStream); System.out.println("stream所用时间" + (System.currentTimeMillis() - start)); }).start(); volatile

  • 保证了可见性
  • 不保证原子性(也就是多线程情况下,无法保证同一个值被多个线程修改)
  • 保证了禁止指令重排(当程序启动时,它可能并不是按照我们代码的顺序执行,比如初始化,可能就不是按照我们写的代码步骤来的,这就是指令重排,保证指令不重排就可以使用volatile关键字进行声明)

/** * 1.使用volatile禁止指令重排 * 2. 使用AtomicInteger原子类保证是原子操作 */ public static volatile AtomicInteger num = new AtomicInteger(); public static void main(String[] args) { for (int i = 0; i < 20; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) { //进行加一操作 num.getAndIncrement(); } }).start(); } //当线程数大于2时,暂停main线程,让给其他线程执行 while (Thread.activeCount() > 2) { Thread.yield(); } System.out.println(num);

原子类操作源码

public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } //原子类底层代码,使用了CAS(比较替换算法,也就是自旋锁) // compareAndSwapInt底层是调用c++操作内存,对应的是native关键字 public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } CAS简单实现

栗子:

@SneakyThrows public static void main(String[] args) { CasLock casLock = new CasLock(); new Thread(()->{ try { casLock.lock(); }catch (Exception e) { e.printStackTrace(); }finally { casLock.unLock(); } }, "Thread1").start(); TimeUnit.SECONDS.sleep(2); new Thread(()->{ try { casLock.lock(); }catch (Exception e) { e.printStackTrace(); }finally { casLock.unLock(); } }, "Thread2").start(); } public static class CasLock { AtomicReference<Thread> lock = new AtomicReference<>(); public void lock() { Thread thread = Thread.currentThread(); if (lock.get() == null) { //拿到泛型中Thread的值进行比较 System.out.println(thread.getName() + "----> 开始自旋..."); } while (lock.compareAndSet(null, thread)) { } } public void unLock() { Thread thread = Thread.currentThread(); System.out.println(thread.getName() + "----> 解锁成功!"); //解锁 lock.compareAndSet(thread, null); } }