Java-10_1-并发编程
并发编程适用于执行时需要长等待的任务(例如网络任务),通过并发编程,可以让这类任务在等待时让出处理器。现今,大多数计算系统已为多处理器或多核系统,并发编程的任务则是尽量让所有的处理器核心都保持忙碌,而不是停住等待耗时程序。
本书只会介绍应用层的一些实现技巧,对于更加底层的并发实现,推荐书籍 Java Concurrency in Practice (Author: Brian Goetz, et.al)。
A. 并发任务基础
在计算机系统中,线程(thread)是由操作系统提供的最小的 顺序执行程序 的机制,从底层上,一般多个线程并发执行,通过使用不同的处理器/处理器核心 或者 同一核心的不同时间片段来实现。
a. 执行器服务
Java 会通过 executor service 来自动安排程序执行所使用的 线程。常用的有两种执行器服务:
1 | // newCachedThreadPool() 没有线程数量限制 |
使用固定线程数量时,线程的数量可以参考系统可供使用的处理器个数,可以通过下述方案获的该值:
1 | int processors = Runtime.getRuntime().availableProcessors(); |
b. Runnable 接口
在 Java 中,想让程序能够并发执行,可以让待执行的任务所在的类实现 Runnable
接口,并将需要并发执行的任务写在 Run()
方法中。具体操作如下:
1 | // Runnable 为 functional interface,可以使用 lambda 表达式 |
c. Callable 接口
Runnable 执行任务时没有返回值,如果想执行并发任务时拥有返回值,则可以使用 Callable<V>
接口,实现其中的 call()
方法。Callable 接口返回一个 类型为 V 的值:
1 | public interface Callable<V>{ |
将 Callable 对象交给执行器服务的方法如下:
1 | ExecutorSerivce exec = ...; |
上述程序中可以看到,Callable 中 Task 递交给 执行器执行,得到的返回值为 Future<V>
类型(名字意思是会在未来某时得到值)。
Future<V>
类的对象通过 get()
方法得到其中的值。get()
方法的执行依赖于该 Future<T>
对象对应的 Callable<T>
对象的 call()
方法的执行。get()
方法会一直处于阻塞(block)状态直到 得到 excutor 的返回值或者达到规定的时长(timeout has been reached, 此时抛出 TimeoutException
)。而如果 call()
方法抛出异常,get()
方法会抛出一个 ExecutionException
来包裹 call()
方法抛出的异常。
Future<V>
类的对象的 cancel(boolean mayInterruptIfRunning)
方法可以尝试取消 任务。如果任务还没有开始执行,该任务就不会被 excutor service 安排。如果已经开始执行,并且 mayInterruptIfRunning
为 true
,任务会被中断。
d. 一次性向执行器服务传入多个任务
如果一个任务含有多个子任务,我们可以使用 excutor service 的 invokeAll()
方法一次性输入所有的待执行任务,方法如下:
1 | // 统计多个文件中某个单词的出现次数,并求和 |
类似的,excutor 还有 invokeAny()
方法,它会在调用的 任务集合 中任意一个任务执行完毕的时候执行返回。
B. 异步计算
回调函数:就是将函数执行的主动权交给子线程中的任务,当子线程中的任务执行完毕后,子线程会通知主线程,主线程因而可以在第一时间使用子线程的计算结果进行计算。关于回调函数的参考
前面的 Future<V>
对象在等待结果的时候,会阻塞进程。Java 8 中引入了带有回调机制的 Furure 的改进版: CompletableFuture
类。该类通过回调机制,不会阻塞主线程来等待支线程的执行结果。
a. 对象构建
获得 CompletableFuture
类的对象大致有如下几种方法:
- 实例方法返回值为
CompletableFuture
类的对象。例如HttpClient
类的sendAsync()
方法。
1 | HttpClient client = HttpClient.newHttpClient(); |
- 使用
CompletableFuture
类的静态方法supplyAsync()
orrunAsync()
(Ref)
runAsync()
使用 Runnable
接口的对象作为参数,supplyAsync()
使用 Supplier<T>
接口对象作为参数(与 Callable<T>
的区别是 Supplier<T>
不会抛出一个 checked exception)。参考下方代码:
1 | CompletableFuture<String> f = CompletableFuture.supplyAsync( |
Note: 上述的 excutor 参数可以空缺,系统会调用默认的 excutor (excutor returned by ForkJoinPool.commonPool()
)。
b. 常用方法
CompletableFuture
提供了一系列的回调方法用于处理 Callable线程中得到的返回值,常用的有 thenAccept()
, thenApply()
参考下面代码:
1 | // thenAccept(), 返回 void |
CompletableFuture
类还提供了一些方法同时处理多于一个任务的结果。例如: thenCombine()
(同时处理两个任务的结果), allOf
(同时处理多个任务的结果)。
C. 线程安全
线程安全问题主要是由于不同线程中使用了相同的变量,从而可能引起值的更新错误、变量的可见性问题。
a. 可见性问题
参考下方的代码:
1 | private static boolean done = false; |
上述程序的实际效果是,Hello + i 打印1000遍之后,程序仍然无法终止。原因是在 hellos
中对 done 变量的更改无法反映到 goodbye
task中。
我们称这种问题为可见性问题,即 done 这一变量,在task hellos
中的更新 对 goodbye
task不可见。
造成上述问题的原因是 现代编译器、虚拟机会在编译程序时进行优化,部分优化措施可能引发上述问题。可能引发问题的优化主要包括下述两个方面:
- 由于内存相对处理器,读写速度太慢,所以虚拟机在执行程序时会将用到的变量数据 载入到高速缓存(cache)中,以提升执行效率。但这也造成了实际程序使用的是原数据的一份拷贝,当处理完毕后才会写回内存。
- 编译器会在不影响实际程序语义逻辑前提下,对执行步骤进行重排列。例如,声明两个 变量后对他们求和,处理器可能颠倒声明的顺序,或者将变量声明的过程做并行处理。
例如上面的 while(!done) i++;
,由于 while 循环中并不修改 done 的值,编译器在优化过程中可能会对代码执行顺序进行重排,从而前述代码编译后为 if(!done) while(true) i++;
。
通常有如下几种方案处理可见性问题,可以使得变量的更改对所有tasks可见:
- final 类型的变量在 初始化后,对所有线程可见;
- static 变量在初始化后,对所有线程可见;
- volatile 类型的变量的变化对所有线程可见;
- 可通过同步锁(lock)实现变量的安全更新。
对于上述简单的问题,可以通过声明 done 变量为 volatile 类型来处理:private static voltaile boolean done;
。
b. 冒险竞争问题(race condition)
当有多个tasks对同一个变量进行更改,可能出现 race condtion 问题,如下:
1 | private static voltaile int count = 0; |
上述代码可能发生问题,因为,实际上 count ++
在实际的程序执行中,并不是最小的执行单元(operation is not atomic)。编译器会将其分为两步来执行:
1 | register = count + 1; |
因而,上述的两个tasks更新 count 的值的程序,可能发生如下问题:
1 | int count = 0; // intial value |
可以发现,虽然两个task都执行了一次 count++
, 但是 count 的值并没有如预期变为2,它的值仍然是1。
如果想上述的程序变为线程安全的,则必须保证 count++
在任意一个task中都能够完整的顺序执行完毕(这一部分的代码也称为 critical section),构成一个等效的 operation atomic。这一思路,可以通过添加 lock 进行保护,得以实现。
c. 安全的并发编程策略
除了上面的数据可见性,安全性问题,对于 Java 来说,在并发程序中被分享使用的数据 也无法通过系统的垃圾回收机制进行自动的回收。
为了解决上述问题,我们提供下述几种策略:
- Confinement。限制不同 task 之间共享数据。例如上面的计数程序,可以的做法就是仅仅在一个 task 中做计数处理,直到计数task处理完毕后,将结果交由另一个 task 处理。
- Immutability。在不同的任务之间 分享 不可更改的对象是安全的。例如通过一个task 生成一个不可更改的 数据 collection,然后在另一个 task 中将第一个task 生成的数据 collection 吸收组合生成另一个不可更改的 数据 collection。一些具体的技巧包括:
- 在对象 construction 之后就不再进行更改,可以将实例对象声明为 final 类型;
- 不要 泄露可以更改的状态,例如通过 non-private 方法可以更改对象的一些内部状态,需要禁止这一行为;
- 通过 传递对象的 copy 来实现内容的传递,而不是直接传递引用。
- Locking。通过 lock 可以保证数据一次仅仅能被一个任务获取。在 Java 中,Java 并发库(concurrency library)中提供了一些能够保证并发操作安全的数据结构。此外,我们也可以自己通过 lock 来实现并发安全的类。
- Note:在可能的情况下,并不推荐使用 locking。首先,它很容易发生错误,其次它强制部分程序不能并发执行,也会降低了并发程序的优势,降低程序效率。
- Partition。我们可以将数据分块处理,每一块数据可以并发的操作,Java 中的 streams 库就是使用的这个思路。
D. Java 库的并行运算支持
a. 流和数组的并行运算
Java 的 stream 库以及 Arrays 类都在内部集成了并行算法。
Streams
例子如下:
1 | long result = coll.parallelStream().filter(s -> s.startWith("A").count()); |
具体的,在实现中,stream会被分割为多段,分别进行 filter 操作,并在最后将结果进行组合。
对于 parallel 的流操作,需要注意以下几点:
- 并行流操作适用于数据量较大,或者计算量较大的场景,对于小数据量的场景,无法体现出其优势,没有必要使用 parallelStream;
- 数据需要存放在内存中;
- 流需要能够被有效的进行分割,具体的,对于背后使用 array或者 平衡二叉树的流,并行算法可以很好的提升效率;而对于背后是 链表实现,或是 Stream.iterate 生成的流,则不适合。
Arrays
例子如下:
1 | Arrays.parallelSetAll(values, i -> i%10); |
类似的,数组的并行处理也是通过分块来实现的。
其他的并行算法还包括 parallelSort()
, parallelPrefix()
。
我们也可以将数组变为 parallel stream 来实现更加复杂的并行处理:
1 | long sum = IntStream.of(values).parallel().sum(); |
b. Java 提供的线程安全的数据结构
在 Java 的 java.util.concurrent
包中,提供了一系列线程安全的集合类。
一个需要注意的是,对于这些线程安全的集合类,他们提供的 iterators 具有 弱一致性(weekly consistent),也就是说,迭代器(iterator)中记录的元素是 construct 该迭代器时集合中的元素,后续对集合进行的更改不一定(may or may not)会在迭代器中进行反应。
需要注意的是,线程安全的类并不是所有的方法都是线程安全的,想要保障线程安全,需要调用他们的特定方法。下面介绍一些具体的线程安全的集合类。
并发 Hash Map
ConcurrentHashMap
是线程安全的 hash map。它通过提供下述更新集合元素的方法来实现安全的多线程操作:
1 | ConcurrentHashMap<String, long> map = new ConcurrentHashMap<>(); |
此外,还有 putIfAbsent()
, computeIfAbsent()
, computeIfPresent()
等方法来实现线程安全的操作。
对于上述线程安全的方法,实现的原理是 在程序中 上述方法会被当做原子型方法来处理(atomic),也就是执行上述方法的时候,不会被其他线程打断。
由于上述方法为 atomic,所以在上述方法中传入的计算过程不能太长,不能为长耗时工作,否则会严重影响系统效率。
除了单个元素的更新查询方法, ConcurrentHashMap
还提供一些大型的操作方法,包括 search, reduce, forEach 方法,这些方法实现线程安全的原理是获得一个元素集合的 snapshot,然后在获取的这个 snapshot 上进行执行。
需要注意的是,对于 ConcurrentHashMap
类,同样有 put() 等与 HashMap
相同的方法, 但是这些方法不是线程安全的方法。
阻塞队列
在 Java.util.concurrent 包中,还提供了线程安全的队列类型,为 阻塞队列(blocking queue),具体包括实现类 LinkedBlockingQueue
(利用链表实现), ArrayBlockingQueue
(利用循环数组实现)。
阻塞队列的工作原理是,生产者(producer)负责向队列中添加元素,消费者(consumer)从队列中取用元素。当队列满或者队列为空时,队列的操作会被阻塞。队列通过这样的方法来实现生产和消耗的平衡。具体的方法参考书中表格10-3。
使用阻塞队列的一个注意点是设定好 consumer 终止的条件。一个方法是添加 last item 的指示符,当consumer 读取到 last item 的指示符后,consumer 便不再等待而阻塞队列。
其他线程安全的集合类
ConcurrentSkipListMap
是一个并发安全的,且可以按照顺序遍历 key 值的 Map 类(类似 HashMap 和 TreeMap)。
CopyOnWriteArrayList
, CopyOnWriteArraySet
通过让所有的执行元素变化的操作 复制 这个集合的方法来实现线程安全。
没有直接的线程安全的 set 提供,但是可以通过 ConcurrentHashMap
的静态方法 newKeySet()
, 构造线程安全的 set。
1 | Set<String> words = ConcurrentHashMap.newKeySet(); |
newKeySet()
构建的 Set<K>
实际上是一个被包裹了的 ConcurrentHashMap<K, Boolean>
, 所有的 key 值都设为 Boolean.TRUE
。
原子型的计数器
特别的,对于 计数器,java.util.concurrent.atomic 包中提供了一些线程安全的计数器。
AtomicLong
, 保证一个线程安全的 类似long 型整数的值。一些方法举例如下:
1 | // 更新,累加1 incrementAndGet() |
AtomicLong
还有方法包括 getAndUpdate()
(返回更新前的值), getAndAccumulate()
(返回更新前的值) …
对于有大量线程同时使用 atomic 值的情况,系统的性能会由于 实现的 乐观性变得低效(理解(may not right):对数进行更新时的逻辑是这样的,首先获取数 A 的值,然后计算更新先放在临时B处,此时,比较之前获取的A的值与现在的A 值是否相同,如果相同,说明没有线程冲突,将B中的值更新到 A 中,否则,重复上面的行为)。为解决上述问题,可以使用 LongAdder
, LongAccumulator
, DoubleAdder
, DoubleAccumulator
作为累加器。
LongAdder
等的实现原理是通过为不同task 提供不同的数块用于各自的更新,最后求和来得到总的累加值。
LongAdder
的方法有:increment()
, 加一操作;add()
, 加一个特定值;sum()
, 求和,即求所有线程的累加值。
LongAccumulator
的方法有:accumulate()
加操作; get()
获取当前值。