DomBro Studio

Java并发编程基础.md

2018/04/13

Java 并发编程基础

引子

最近在撸 《Java并发编程的艺术》这本书,原因有二。其一,并发、线程、是一名 Java 后台人员永远无法跳过绕开的话题;其二,最近面试被所有面试官统统被问到并发,遂下定决心,用心去看一下,我并不想去刷面试题糊弄面试官。

是本好书

不得不承认 《Java并发编程的艺术》 是一本好书,与我所看过的讲解线程的博客不同,作者几乎用前三章在解释很底层的问题 —— Java 并发究竟是什么、虚拟机和系统如何让线程并发的工作。尤其是第三章 —— Java 内存模型,简直底层的不要不要的。前三章使用检视阅读看了一点,在了解了一些名词含义后毅然决然开始了第四章。

线程

作者介绍了了什么是线程。这里我跳了,因为我知道什么是线程啊。之前还做过笔记 ,戳我跳转

线程间的通信

线程间的通信不是想象中你一言我一语的瞎唠嗑,而是一个线程可以从另一个线程中得到一些有价值有意义的东西。线程的通信可以让线程相互配合的完成工作,这会变得很有价值。所以请记住本节的关键字——通信

  • 线程的栈空间

再继续进行之前,你需要知道一件事情,每个线程都有一个自己的栈空间,按照代码一步一步的执行

  • 本地内存

线程虽然共享内存,但是每个线程还是只有自己的 “私密空间” 的,这个私密空间可以用来存储共享变量的副本,我们叫这个私密空间为本地内存。本地内存只是 JMM (Java Memory Model) 中的一个抽象概念,并不真实存在的。这个本地内存可以放置共享变量的拷贝,这样做肯定是为了加速程序执行。

volatile 和 sychronized

本节你会看到 volatile 和 sychronized 这两个关键字是如何使线程通信的。

volatile 关键字

是的,你可能从来没有见过这个关键字,或者从来没使用过这个关键字(我也一样 T_ T)。但是,如果告诉你 volatile 关键字是实现并发编程中扮演着重要角色,你慌不慌?

  • 一个场景&问题的提出

为什么要在上面介绍本地内存这样一个概念呢 ? 我们知道,每个线程为了更快速地执行程序,会把主内存中的共享变量拷贝到线程的本地内存中,然后读取本地内存中的共享变量即可。然而在多线程下,共享变量 x 有可能在主内存中被 线程A 拷贝到本地内存后,线程B 修改了主内存中的 共享变量 x,但是线程A 却并不知道 x 的值已经改变了,继续本地内存中读取 x 的拷贝,这一定会造成程序出错。因此在程序执行过程中一个线程看到的变量不一定是最新的,如何解决?

  • volatile 的作用&问题的解决

上面的问题答案很好给出,只要让线程都去读取 主内存 中的共享变量不就得了。没有错 关键字volatile 就是起这个作用的。

1
2
/**代码示例 1 使用 volatile 关键字修饰字段**/
volatile boolean flag = true;

像代码示例一中使用 volatile 修饰字段,就会让内存模型中所有线程在读取该变量时,在主内存中读取,从每个线程都会得到最新的共享变量值。 通过这种方法,volatile关键字让修改 线程A 读取到看了 被线程B 修改后的 共享变量x,达到了线程通信的目的。 需要注意的是,禁令避免大量使用 volatile 关键字,会影响效率。

sychronized 关键字

我想这个关键字不会像 volatile 那么陌生,同样 sychronized 也是并发编程中的重要角色,我们一般叫他同步锁。那么它是如何让线程间进行通信的呢?

  • sychronized 通信的内容

众所周知,sychronized 可以修饰用来修饰方法,也可以把使用 sychronized 同步代码块。这两种方式都可以实现同步。同步的意思可以理解为一个被同步访问的对象一次只可以被一个线程访问,其他线程需要等待。那好了, sychronized 在线程间的通信内容就是一个线程告诉另一个线程”嘿,哥们,我正在访问这个对象,你排队去!”

  • Monitor 监视器

知道了使用 sychronized 关键字在线程间的通信内容,我们来看一下原理。虽然同步方法和同步代码块实现同步的细节不一样,但是其原理都是一样滴,本质都是线程对一个对象(被同步对象)的监视器 Monitor 进行获取,这个获取过程是排他性的,即在一个时刻只能有一个线程获取到 sychronized 保护对象的监视器。如果获取监视器失败的线程,则会进入等待队列。对了,每个对象都有自己的监听器。这个监听器是在底层实现的,你只知道有这样一个东东就可以了。

  • 感受

以前从来没觉得 sychronized 是在线程通信的额范畴之内,只知道使用,知道现在才突然明白。就算是同步也要在线程通信的基础上才可以完成。

等待/通知

如果一个线程改变了一个值,而另一个线程感受到了变化,然后进行相应操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者是消费者,这种模式隔离了”做什么”和”怎么做”,在功能上实现了解耦。在 Java 语言中使用 Object.wait() 和 Object.notify() 即等待通知来实现(这方法一定是见过的)。

  • 等待通知机制

等待通知机制是指当 线程A 调用 对象O 的wait()方法进入等待状态,线程B 调用 对象O 的 notify() 或 notifyAll(),线程A 收到通知后从 wati() 方法返回执行下一步操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* 代码清单2 一个简单的等待通知的例子
*/
public class WaitNotify {

static boolean flag = true;
static Object lock = new Object();

//等待的线程,等待的条件是 flag = true
static class Wait implements Runnable{

@Override
public void run() {
//在 lock 上加锁
synchronized (lock){
//当条件不满足时,继续wait()
while (flag){
try {
System.out.println(Thread.currentThread()+"flag is true ,wait "+ LocalDateTime.now().toString());
lock.wait();
} catch (InterruptedException e) {
}
}

//当条件满足时,完成工作
System.out.println(Thread.currentThread()+"flag is false ,run "+ LocalDateTime.now().toString());
}
}
}


static class Notify implements Runnable{

@Override
public void run() {
synchronized (lock){
System.out.println(Thread.currentThread() + " hold lock . notify " + LocalDateTime.now().toString());
//通知,但注意此时并不会释放lock的锁,即不会直接执行 wait 线程
lock.notifyAll();
//修改条件
flag = false;
try {
//休眠五秒
Thread.sleep(5000);
} catch (InterruptedException e) {

}
}

//再次加锁
synchronized (lock){
System.out.println(Thread.currentThread()+" hold lock .again sleep " + LocalDateTime.now().toString());
//休眠五秒
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
}
}
}


public static void main(String[] args) throws InterruptedException {
Thread waitThread = new Thread(new Wait(),"WaitThread");
waitThread.start();

TimeUnit.SECONDS.sleep(1);

Thread notifyThread = new Thread(new Notify(),"NotifyThread");
notifyThread.start();
}

}

运行结果

1
2
3
4
Thread[WaitThread,5,main]flag is true ,wait 2018-04-19T14:44:23.727
Thread[NotifyThread,5,main] hold lock . notify 2018-04-19T14:44:24.711
Thread[WaitThread,5,main]flag is false ,run 2018-04-19T14:44:29.711
Thread[NotifyThread,5,main] hold lock .again sleep 2018-04-19T14:44:29.711

运行结果的第三行和第四行顺序可能会调换。通过这个简单的例子来阐述一下等待通知的几个细节

1.使用 wait() nofity() nofityAll() 方法之前一定要先对调用对象加锁。
2.通知线(调用 notify() 线程)程在 nofity() 或 notifyAll() 之后,等待线程并不会从 wait() 方法返回而在通知线程退出锁之后,等待线程才有机会从 wait() 方法返回。
3.等待线程从 wait() 方法返回的前提是获得了调用对象的锁。

  • 等待/通知的经典范式

从 代码清单2 可以推出等待通知的经典范式。
等待方遵循如下原则:
1.获取对象的锁。
2.如果条件不满足,那么调用 对象的wait() 方法,被通知后仍要检查条件。
3.条件满足执行对应逻辑。

1
2
3
4
5
6
7
8
/**代码清单3 等待方的伪代码***/

synchronized(对象){
while(条件不满足){
对象.wait();
}
条件满足对应的处理逻辑
}

通知方遵循如下原则:
1.获得对象的锁。
2.改变条件。
3.通知所有等待在对象上的程序。

1
2
3
4
5
6
7
/***代码清单4 通知方的伪代码***/

synchronized(对象){
改变条件
对象.nofityAll();

}
  • 我的发现

之前就看线程时就被告知,等待/通知 都会释放锁,现在终于明白为什么了,如果和 sleep() 方法一样不释放锁,那么等待就会一直等待,不会给同样把对象加了锁的通知方执行的机会!

  • 等待/通知的实用超时范式

在开发时往往会遇到下面这样的场景:调用一个方法时等待一段时间(一般来说是给定一段时间),如果该方法在指定时间段内得到结果,那么结果立刻返回,反之,超时返回默认值。上面提到的 等待/通知经典范式无法做到超时等待。但是等待超时的加入,只需要对经典范式做出非常小的改动,改动如下。

1.设置等待持续时间 remaining ,超时时间等于 future = now + remaining 。
2.使用对象的 wait(remaining) 方法,在 wait(remaining) 返回后将会执行 remaining = future - now ,如果 remaining < 0 表示已经超时,直接退出。否则,继续执行 wait(remaining)。所以顾名思义,等待超时就是在等待超时退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/***代码清单5 等待超时模式 ****/

public synchronized Object get(long mills) throws InterruptedException {
//超时时间
long future = System.currentTimeMillis() + mills;
//持续等待时间
long remaining = mills;
//当超时大于 0 并且 result 返回值不满足要求时
while ((result == null) && remaining > 0){
//如果在 remaining 时间段内,没有得到通知,会自动返回
wait(remaining);
remaining = future - System.currentTimeMillis();
}
return result;
}
}

可以看出来,等待超时模式就是在 等待/超时模式基础上面增加了超时控制,这使得该模式比原有范式更具有灵活性,因为即使在规定时间内没得到通知,也会按时返回,不会永久的陷入阻塞(一直等待)。

ThreadLocal 的使用

之前听都没听过 ThreadLocal 这个家伙。 ThreadLocal ,线程变量,是一个以 ThreadLocal 对象为键,任意对象为值的存储结构。这个结构被负在线程上,也就是说一个线程的可以根据一个 ThreadLocal 对象查询到绑定在这个线程上的一个值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 代码清单6 一个测试 ThreadLocal 的工具类
*/
public class Pofiler {

private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>(){
//返回此线程局部变量的初始值,如果没有set值吗,则第一次 get 则会将该值获取出来
protected Long initialValue(){
return System.currentTimeMillis();
}
};


public static final void begin()
{
//为当前线程设置线程变量
TIME_THREADLOCAL.set(System.currentTimeMillis());
}


public static final long end(){
//get() 获取当前线程的时间变量
return System.currentTimeMillis() - TIME_THREADLOCAL.get();
}

public static void main(String[] args) throws InterruptedException {
Pofiler.begin();
TimeUnit.SECONDS.sleep(1);
System.out.println("Cost : "+ Pofiler.end() +" mills");
}
}

代码清单6 中的 Profiler 可以被复用在方法调用的耗时上,在方法的入口前执行 begin() 方法,在方法的调用后执行 end() 方法,这样做的好处是这两个方法的调用不在同一个方法或类中调用。比如说 AOP 中。

  • 我的发现

开始并没有在 ThreadLocal 中找到线程通信的蛛丝马迹,实际上 ThreadLocal 是在为调用的线程设置一个值。

线程池技术

熟悉线程的人一定会对线程池有印象。为什么要使用线程池?如果服务端每次接到一个任务就,就创建一个线程,在原型阶段是不错的选择,当面临成千上万个任务提交到服务端时,那将创建上万个线程,将会引起大量的上下文切换,增加系统的负担,还有线程的创建和消亡都需要耗费系统资源,无疑浪费了系统资源。线程池技术能够很好地解决这个问题,线程池会预先创建若数量线程,并且不能由用户直接对线程的创建进行控制,
在这个前提下,重复使用固定数目线程执行完成任务的执行。
这样做的好处很明显,减少了大量线程创建和消亡线程的系统资源开销,另外面对过量任务的提交能够平缓的劣化(老实说这句话啥意思…)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/***代码清单7 自定义的线程池***/
/**
* 一个简单的线程池接口
* Task 任务
*/
public interface ThreadPool<Task extends Runnable> {

//执行一个任务
void execute(Task task);
//关闭线程池
void shutdown();
//增加执行任务线程
void addWorkers(int num);
//减少执行任务线程
void removeWorkers(int num);
//得到正在等待执行的任务数量
int getTaskSize();
}

//实现 ThreadPool 的默认线程池
public class DefaultThreadPool<Task extends Runnable> implements ThreadPool<Task> {

//线程池最大线程数
private static final int MAX_WORKER_NUMBER = 10;
//线程池默认线程数
private static final int DEFAULT_WORKER_NUMBER = 5;
//线程池最小的线程数
private static final int MIN_WORKER_NUKBER = 1;
//任务列表
private final LinkedList<Task> tasks = new LinkedList<>();
//执行任务的线程
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
//执行任务线程数量
private int workerNum = DEFAULT_WORKER_NUMBER;
//线程编号
private AtomicLong threadNumber = new AtomicLong();

//默认生成5条执行任务线程
public DefaultThreadPool(){
initializeWorkers(DEFAULT_WORKER_NUMBER);
}

public DefaultThreadPool(int num){
workerNum = num > MAX_WORKER_NUMBER ? MAX_WORKER_NUMBER : num < MIN_WORKER_NUKBER ? MIN_WORKER_NUKBER : num;
initializeWorkers(num);
}

//初始化完成执行任务线程
private void initializeWorkers(int num) {
for (int i = 0;i < num;i++){
Worker worker = new Worker();
workers.add(worker);
Thread workerThread = new Thread(worker,"ThreadPool-Worker-"+threadNumber.incrementAndGet());
workerThread.start();
}
}


@Override
public void execute(Task task) {
//如果任务不为空,向任务列表中添加 task 然后进行通知 -> 通知会使该任务执行
if (task != null){
synchronized (tasks){
tasks.add(task);
tasks.notify();
}
}
}

@Override
public void shutdown() {
for (Worker worker : workers){
worker.shutdown();
}
}

@Override
public void addWorkers(int num) {
synchronized (tasks){
if (this.workerNum + num > MAX_WORKER_NUMBER){
num = MAX_WORKER_NUMBER - this.workerNum;
}
initializeWorkers(num);
this.workerNum += num;
}
}

@Override
public void removeWorkers(int num) {
synchronized (tasks){
if (num >= this.workerNum){
throw new IllegalArgumentException("beyond workerNumber");
}
//按照给定的数量停止 Worker
int count = 0;
while (count < num){
Worker worker = workers.get(count);
if (workers.remove(worker)){
worker.shutdown();
count++;
}
}
}
}

@Override
public int getTaskSize() {
return tasks.size();
}

class Worker implements Runnable{
//是否工作
private volatile boolean running = true;
@Override
public void run() {
while (running){
Task task = null;
synchronized (tasks){
//如果任务列表是空的则一直等待
while (tasks.isEmpty()){
try {
tasks.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
task = tasks.removeFirst();
}
//取出链表中的第一个任务并执行
if (task != null){
task.run();
}
}
}
public void shutdown(){
running = false;
}
}


}

一起来看一下这个线程池实现
1.ThreadPool 定义了线程池的基本操作,执行任务、关闭线程池等。
2.自定义的线程池 DefaultThreadPool 是由两个重要的数据结构组成 任务链表 LinkedList tasks 以及 执行任务线程列表 List workers 组成(注意这个线程列表时线程安全的)。
3.再看一下初始化方法 initializeWorkers 将 执行任务的线程 进行初始化,传入参数表示生成的线程数量。
4.Worker 是用来循环执行 Task 的,当任务列表不为空,且拿出来的任务不为空就调用任务的 run() 方法,为空则等待。
5.execute 方法是添加一个任务,他要做两件事,首先将任务添加到任务列表中,其次唤醒等待的线程(即执行任务的线程)。
6.注意到在对任务进行操作时,都会对任务列表上锁,这保证了任务执行的安全性。
7.关闭线程池的方法很简单,只要让 所有执行任务的线程不在执行任务就可以啦!
8.可以看到,数据结构+算法的威力。

总结

笔记没有介绍线程API的基本知识,而是重点介绍了线程之间的通信,之所以会出现多线程编程,主要就是因为Java在底层实现的线程通信机制。另外也介绍了线程池的基本实现,希望通过这个自定义线程池,让大家对多线程加深印象。

CATALOG
  1. 1. Java 并发编程基础
    1. 1.1. 引子
    2. 1.2. 是本好书
    3. 1.3. 线程
    4. 1.4. 线程间的通信
      1. 1.4.1. volatile 和 sychronized
        1. 1.4.1.1. volatile 关键字
      2. 1.4.2. sychronized 关键字
      3. 1.4.3. 等待/通知
      4. 1.4.4. ThreadLocal 的使用
    5. 1.5. 线程池技术
    6. 1.6. 总结