Memorykk

never too late to learn

JUC

java.util.concurrent

1.前言

1.1 什么是JUC

JUC —— (java.util.concurrent)是一个包名的缩写,该包下存放的均为多线程相关类


1.2 Java中默认有几个线程?

一共有 2 个

  • Java程序线程
  • GC回收器线程

2. 线程

2.1 线程的六大状态

  • NEW 新生
  • RUNNABLE 运行
  • BLOCKED 阻塞
  • WAITING 等待
  • TIMED_WAITING 超时等待
  • TERMINATED 死亡

2.2 wait与sleep的区别

  1. 来自不同的类

    wait -> Object

    sleep -> Thread

  2. 关于锁的释放

    wait:释放锁

    sleep:不释放锁、

  3. 使用的范围不同

    wait只能在synchronized中使用

    ​ 为什么?看我原创整理笔记👇

    Java线程之为何wait()和notify()必须要用同步块中 - VioletTec’s Blog (mcplugin.cn)

    sleep可以在任意地方使用


3. Lock(锁)

Lock是一个接口,位于java.util.concurrent.locks包下

其有多个实现类。

image-20210107194254581

3.1 可重入锁ReentrantLock

默认情况下,ReentrantLock的构造方法默认是new一个不公平(unfair)锁

image-20210107194210736

但是在构造方法中传入一个boolean,即可控制new的锁是否为公平锁

true:公平锁(fair lock)

false:不公平锁(unfair lock)

为什么默认要用非公平锁?

因为公平。因为如果使用公平锁,会有可能导致执行耗时长的线程优先执行,会导致CPU使用效率下降。


3.1.1 公平锁和非公平锁

  • 公平锁:先来后到(必须是先来的先执行)

  • 非公平锁:非前来后到,可插队(根据CPU进行调度)


3.2 Lock与synchronized的区别

  • Synchronized是内置关键字,Lock是一个类
  • Synchronized无法判断是否获取到了锁,Lock可判断是否获得到了锁
  • Synchronized会自动获取和释放锁
  • Synchronized 线程 1(获得锁,阻塞)、线程2(等待,傻傻的等);Lock锁就不一定会等待下去
  • Synchronized 可重入锁,不可以中断的,非公平
    Lock :可重入锁,可以判断锁,非公平(可以自己设置)
  • Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码

3.3 虚假唤醒

当一个条件满足时,很多线程都被唤醒了,但是只有其中部分是有用的唤醒,其它的唤醒都是无用功
1.比如说买货,如果商品本来没有货物,突然进了一件商品,这是所有的线程都被唤醒了,但是只能一个人买,所以其他人都是假唤醒,获取不到对象的锁

比如一下代码:

package duoxiancheng.bao;

/*
* 虚假唤醒的解决:
* wait要始终保证在while循环当中。
*/
public class LockTest {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Producter producter = new Producter(clerk);
Customer customer = new Customer(clerk);

new Thread(producter,"生产者A").start();
new Thread(customer,"消费者A").start();
new Thread(producter,"生产者B").start();
new Thread(customer,"消费者B").start();
}
}

// 售货员
class Clerk {
private int product = 0;

// 进货
public synchronized void add() {
// 产品已满
while (product >=1) {
System.out.println(Thread.currentThread().getName() + ": " + "已满!");
try {
this.wait();
} catch (InterruptedException e) {
}
}
++product;
// 该线程从while中出来的时候,是满足条件的
System.out.println(Thread.currentThread().getName() + ": " +"....................进货成功,剩下"+product);
this.notifyAll();
}

// 卖货
public synchronized void sale() {
while (product <=0) {
System.out.println(Thread.currentThread().getName() + ": " + "没有买到货");
try {
this.wait();
} catch (InterruptedException e) {
}
}
--product;
System.out.println(Thread.currentThread().getName() + ":买到了货物,剩下 " + product);
this.notifyAll();
}
}

// 生产者
class Producter implements Runnable {
private Clerk clerk;

public Producter(Clerk clerk) {
this.clerk = clerk;
}

// 进货
@Override
public void run() {
for(int i = 0; i < 20; ++i) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
clerk.add();
}
}
}

// 消费者
class Customer implements Runnable {
private Clerk clerk;
public Customer(Clerk clerk) {
this.clerk = clerk;
}

// 买货
@Override
public void run() {
for(int i = 0; i < 20; ++i) {
clerk.sale();
}
}
}

当add了一个产品时,会notifyAll,唤醒所有线程。但是并非所有线程都需要sale一个产品。

如果使用if,若当一个线程执行完if(product >= 1)后跳过wait语句,然后将CPU时间让出。

如果重复以上步骤,有许多线程都出现该问题时,当他们返回CPU现场,获得CPU运行时间的时候,则会继续执行sale方法,导致多个线程同时sale。如果加上while,当一个县城跳过wait时,让出CPU后,再获得CPU时,会跳到while(product >= 0)重新判断一次,防止虚假唤醒

(add同理)


4. 8锁的现象

意义:加深我们对被锁的物体的理解。

具体代码:见视频,懒得写了(dog


5. Callable(简单)

callable特点:可以返回内容,可以抛出异常

package com.kuang.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.ReentrantLock;
/**
* 1、探究原理
* 2、觉自己会用
*/
public class CallableTest {
public static void main(String[] args) throws ExecutionException,InterruptedException {
// new Thread(new Runnable()).start();
// new Thread(new FutureTask<V>()).start();
// new Thread(new FutureTask<V>( Callable )).start();
new Thread().start(); // 怎么启动Callable
MyThread thread = new MyThread();
FutureTask futureTask = new FutureTask(thread); // 适配类
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start(); // 结果会被缓存,效率高
Integer o = (Integer) futureTask.get(); //这个get 方法可能会产生阻塞!把他放到
最后
// 或者使用异步通信来处理!
System.out.println(o);
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call() {
System.out.println("call()"); // 会打印几个call
// 耗时的操作
return 1024;
}
}

6. 常用的辅助类(必会)

6.1 CountDownLatch

允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助

CountDownLatch coutnDownLatch = new CoutDownLatch(6);//减法计数器
for(int i=1;i<=6;i++){
new Thread(()->{
System.out.println(Thread.currentThread().getName + "go out");
coutDownLatch.countDown();//计数器-1
},String.valueof(i)).start();
}
coutDownLatch.await();//等待计数器归零
System.out.println("Close the Door!");

CountDownLath常用方法:

  • new CoutDownLatch(int count);//构造方法,用于初始化计数器的值
  • countDown();//计数器-1
  • await();//等待线程直到计数器为0为止

6.2 CyclicBarrier

允许一组线程全部等待达到彼此共同的屏障点的同步辅助。

可以理解为:加法计数器

//集齐七颗龙珠召唤神龙。
CyclicBarrier cyclicbarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功!");
});
for(int i=1;i<=7;i++){
new Thread(()->{
System.out.println("收集了"+i+"颗龙珠");
},String.valueof(i)).start();
}
cyclicBarrier.await();//等待计数器变成7时

CyclicBarrier和CountDownLatch的区别就是:
CountDownLatch不可以重置计数。

image-20210129164950688

如果想要重置计数,可以使用CyclicBarrier。

image-20210129165010297


6.3 Semaphore

Semaphore:信号量

一个计数信号量,在概念上,信号量维持一组许可证。如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它

//acquire() 得到
//release() 释放
package com.kuang.add;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
// 线程数量:停车位! 限流!
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
// acquire() 得到
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到车
位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车
位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // release() 释放
}
},String.valueOf(i)).start();
}
}
}

原理:

semaphore.acquire() 获得,假设如果已经满了,等待,等待被释放为止!

semaphore.release(); 释放,会将当前的信号量释放 + 1,然后唤醒等待的线程!


6.4 Exchanger(笔者补充)

允许两个线程在集合点交换对象,并且在多个管道设计中很有用。

简单来说,Exchanger的.exchange(Object obj)方法,是一个阻塞的交换点。

当两个线程都在.exchange()阻塞时,进行交换。

class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger<>();

//下面是两个装数据的实体类
DataBuffer initialEmptyBuffer = ... a made-up type
DataBuffer initialFullBuffer = ...

//FillingLoop : 不断向DataBuffer中添加数据,并交换给EmptyingLoop
class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);//从currentBuffer里取出一个数据
if (currentBuffer.isFull()){
//如果是装满了,就交还给EmptyingLoop类进行消费
currentBuffer = exchanger.exchange(currentBuffer);
}
}
} catch (InterruptedException ex) { ... handle ... }
}
}

//EmptyingLoop : 不断从DataBuffer中取出数据,并交换给FillingLoop
class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.isEmpty()){
//如果消费完了,则交换buffer给FillingLoop进行生产
currentBuffer = exchanger.exchange(currentBuffer);
}
}
} catch (InterruptedException ex) { ... handle ...}
}
}

void takeFromBuffer(DataBuffer dataBuffer){
//.....
dataBuffer.take();
//......
}
void addToBuffer(DataBuffer dataBuffer){
//.....
dataBuffer.add(xxxxx);
//......
}
void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}

7. 读写锁

  • 独占锁(写锁) 一次只能被一个线程占有
  • 共享锁(读锁) 多个线程可以同时占有
  • ReadWriteLock
  • 读-读 可以共存!
  • 读-写 不能共存!
  • 写-写 不能共存!

8. 阻塞队列

BlockingQueue:阻塞队列

image-20210123122529714

SynchronousQueue 同步队列:

和其他的BlockingQueue不一样。

没有容量, 进去一个元素,必须等待取出来之后,才能再往里面放一个元素!

只有 put()、take()两个方法


9. 线程池(重点)

池化技术

程序的运行,本质:占用系统的资源!优化资源的使用=》池化技术

线程池、连接池、内存池、对象池

创建和销毁线程需要从用户态内陷到内核态进行操作,是一个耗时的操作。

线程池:三大方法、7大参数、4种拒绝策略

池化技术:池化技术简单点来说,就是提前保存大量的资源,以备不时之需。

↓↓↓为了方便理解线程池,我插入一些题外话,方便深入理解线程池↓↓↓:


什么是线程?

线程是调度CPU的最小单元,也叫做轻量级进程LWP(Light Weight Process)
Java中有两种线程模型:

  1. 用户级线程(ULT)(Uer Level Thread)
  2. 内核级线程(KLT)(Kernel Level Thread)
  • ULT:用户程序实现,不依赖操作系统核心,应用提供创建,同步,调度和管理线程的函数来控制用户线程,不需要用户态/内核态的切换,速度快,内核对ULT无感知,线程阻塞则进程(包括它的所有线程)阻塞。
  • KLT:系统内核管理线程(KLT),内核保存线程的状态和上下文的信息,线程阻塞不会引起进程阻塞。在处理器系统上多线程在多处理器上并行运行。现成的创建,调度和管理由内核完成,效率比UTL慢,比进程操作快。

市面上绝大多数的Java虚拟机都是使用的KLT,及系统内核管理线程
文字描述比较抽象,我们来画一个图描述一下ULTKLT的区别


undefined


JVM由于在用户空间,无权使用内核空间,只能调用系统开放的API(如:Linux开放的p_thread函数)去操作线程,在映射到底层的CPU上,由于调度API需要提高权限,所以会把自身状态陷入到内核态来取得权限。
用户所有的线程都会存放在线程表中,由内核统一的调度和维护。
这就是为什么会进行用户态/内核态状态切换的原因。


undefined


根据上图,JVM创建和执行线程可以列为一下这么几个步骤

  1. 线程会使用库调度器
  2. 之后陷入到内核空间
  3. 创建内核线程
  4. 内核中的线程会被维护到线程表中
  5. 由操作系统调度程序去调度
  6. CPU会根据调度算法分配时间
  7. 把没有执行完的线程写入给你高速内存区(SSP)

内核空间中有一个高速内存区SSP(程序任务运行状态段)是用于存储还没有执行完成,但是被分配的时间已经用完的线程中的数据,,等待下一次被分配到了时间后,就把保存在SSP里的上下文信息加载到CPU的缓存。

综上所述,线程是一个稀缺资源,他的创建和销毁是一个相对偏重且消耗资源的操作,而Java线程依赖于内核进程,创建线程需要进行操作系统状态切换,为避免过度消耗,我们要设法重用线程执行多个任务。


线程池就是一个线程缓存,负责对线程进行统一分配,调度与监控

他的优点有很多,最突出的优点就是:

  • 重用存在的线程,减少线程的创建,消亡所用开销,提升性能
  • 提高响应速度。当任务到达时,可以不需要等待线程的创建就可以立即执行
  • 提高线程的可管理性,可统一分配,调度和监控。

那么线程池是如何把线程统一分配调度与监控的呢?


img
【画图太累了,我就用的网图,图片来源视频:https://www.bilibili.com/video/av88030891

img
【画图太累了,我就用的网图,图片来源:https://blog.csdn.net/lchq1995/article/details/85230399


我们在NEW一个最基本的线程池的时候,会传入这么一下几个参数:
corePoolSize:线程池核心线程数量
maximumPoolSize:线程池最大线程数两
keepAliveTime:空闲线程存活时间

  • corePoolSize顾名思义就是最大核心线程数量,是线程池可以同时执行的线程数量。
  • maximumPoolSize,既然有corePoolSize,那么如果corePoolSize满了怎么办呢?这时候就会用到一个队列,叫阻塞队列Block Queue
    什么是BlockQueue?它有什么特点?
    既然是Queue,那么久满足队列模型的(FIFO)原则,一端放入,另一端取出。(First In First Out)
    阻塞队列有一个特点就是:在任意时刻,不管并发量有多高,永远只有一个线程能进行队列的如对或出队,所以BlockQueue是一个线程安全的队列。
    并且如果队列满了,只能进行出队操作,所有入队操作必须等待,也就是阻塞。
    如果队列为空,那么就只能进行入队操作,所有出队操作必须等待,也就是阻塞。
    一旦线程池的线程量满了,那么新被execute进来的线程,就会被存储进BlockQueue,BlockQueue的大小就是maximumPoolSize - corePoolSize的大小。

线程池和五种状态:

  • Running:能接受新的execute以及处理已添加的任务
  • Shutdown:不接收任何新的execute,可以处理已添加的任务
  • Stop:不接受任何新的execute,不处理已添加的任务
  • Tidying:所有任务已经终止,ctl记录的任务数量为0.
  • Termiated:线程池彻底终止,则线程池转换为Terminated状态。

img


那么这么多线程池状态和这么多线程的信息,是如何保存的呢?
这里线程池内部用到了一个32字节的Integer类型来记录线程池的状态和线程数量信息。


img


这个Integer类型的高3未二进制用来表示线程池的状态,后29为用来表示线程的数量。
线程池定义了这么几个数字作为线程的状态

RUNNING = -1
SHUTDOWN = 0
STOP = 1
TIDYING = 2
TERMINATED = 3

并且所有数字都想做移位29位。
<< COUNT_BITS(COUNT_BITS=29)

最终会得到高三位为:
RUNNING = 111
SHUTDOWN = 000
STOP = 001
TIDYING = 010
TERMINATED = 011


他是怎么得到的呢?

我们来回顾一下基础

拿-1来举个例子
众所周知,
1在32位Integer的类型中二进制为:
0000 0000 0000 0000 0000 0000 0000 0001
那么-1就应该去1的反位后+1,并且再加上一个符号位1000
则,-1就应该为:
1000 1111 1111 1111 1111 1111 1111 1111
那么-1向左移位29位,低位补0,那么则
-1<<29 等于
1110 0000 0000 0000 0000 0000 0000 0000
所以高三位为111


所以这就是RUNNING的高三为为111的由来
后面的29位用于存储线程的数量。


这种应用基本数据高效存储的思想可以用于存储一些记录,有点就是不用去多个变量的读取,提升速度。


具体线程池的实现可以百度搜索JAVA线程池的实现,在这里只是浅谈一下线程池的好处以及浅层原理。


↑↑↑以上为插入的题外内容,方便对线程池的理解。转载自本人原创博客。↑↑↑

原文地址:https://blog.mcplugin.cn/p/225


根据阿里巴巴的《Java开发手册》中

image-20210123150117054

不允许使用Executors去创建线程/线程池,而是使用ThreadPoolExecutor

为什么会发生OOM?因额外FixedThreadPool、SingleThreadPool的允许请求的最大队列长度,以及CachedThreadPool允许创建的最大线程数量为Integer.MAXVALUE,Integer最大值为21亿多(2147483647),若发生意外,则容易在线程池的队列中发生OOM

禁止以下列形式创建线程/线程池❌↓

ExecutorService threadPool1 = Executors.newSingleThreadExecutor();// 单个线程
ExecutorService threadPool2 = Executors.newFixedThreadPool(5); // 创建一个固定的线程池的大小
ExecutorService threadPool3 = Executors.newCachedThreadPool(); // 可伸缩的,遇强则强,遇弱则弱

通过查看Executors.newXxxThreadPool();的方法,我们可以看到它底层都使用了ThreadPoolExecutor类。

image-20210123151334625

我们可以看到CachedThreadPool的最大线程数量的确为Interger的最大值,即2147483647

即,Executors的本质还是使用了ThreadPoolExecutor();

我们接着来看ThreadPoolExecutor()的构造方法:

image-20210123151506816

最大线程应如何定义?

  • CPU 密集型 -> 有几个核心就是可以同时并行几个
    • 将最大线程可以定义为CPU的核心数量
    • 获取CPU核心数量: Runtime.getRuntime().availableProcess();
  • IO 密集型 -> 判断你的程序中十分耗IO的线程有多少个。至要大于这个数字就可以了!一般是耗IO线程数量的2倍

10. 四大函数式接口(重点、必须掌握)

新时代程序员特色社会主义编程技能:

  1. lambda表达式
  2. 链式编程
  3. 函数式接口
  4. 流式计算

函数式接口:只有一个方法的接口。这个接口有一个注解:@FunctionalInterface

学习目的:简化编程模型,在新版本的底层大量应用。

如:list.foreatch(消费者类型的函数式接口);

四大原生的函数式接口:

  1. Consumer
  2. Function
  3. Predicate
  4. Supplier

就记到这里,下面都没在手动记笔记了…懒了。