Java-10_1-并发编程

并发编程适用于执行时需要长等待的任务(例如网络任务),通过并发编程,可以让这类任务在等待时让出处理器。现今,大多数计算系统已为多处理器或多核系统,并发编程的任务则是尽量让所有的处理器核心都保持忙碌,而不是停住等待耗时程序。

本书只会介绍应用层的一些实现技巧,对于更加底层的并发实现,推荐书籍 Java Concurrency in Practice (Author: Brian Goetz, et.al)。

A. 并发任务基础

在计算机系统中,线程(thread)是由操作系统提供的最小的 顺序执行程序 的机制,从底层上,一般多个线程并发执行,通过使用不同的处理器/处理器核心 或者 同一核心的不同时间片段来实现。

a. 执行器服务

Java 会通过 executor service 来自动安排程序执行所使用的 线程。常用的有两种执行器服务:

1
2
3
4
5
6
7
8
// newCachedThreadPool() 没有线程数量限制
// 当所有线程都在忙碌时自动生成新的线程
// 适合大量短生命周期,长等待的小型任务。
ExecutorService exec = Executors.newCachedThreadPool();

// newFixedThreadPool() 规定线程池的线程数量
// 适合计算密集型程序,或者需要限制资源消耗的服务
ExecutorService exec = Executors.newFixedThreadPool(nthreads);

使用固定线程数量时,线程的数量可以参考系统可供使用的处理器个数,可以通过下述方案获的该值:

1
int processors = Runtime.getRuntime().availableProcessors();

b. Runnable 接口

在 Java 中,想让程序能够并发执行,可以让待执行的任务所在的类实现 Runnable 接口,并将需要并发执行的任务写在 Run() 方法中。具体操作如下:

1
2
3
4
// Runnable 为 functional interface,可以使用 lambda 表达式
Runnable task = () -> {....};
ExecutorService exec = ...;
exec.excute(task);

c. Callable 接口

Runnable 执行任务时没有返回值,如果想执行并发任务时拥有返回值,则可以使用 Callable<V> 接口,实现其中的 call() 方法。Callable 接口返回一个 类型为 V 的值:

1
2
3
public interface Callable<V>{
V call() throws Exception;
}

将 Callable 对象交给执行器服务的方法如下:

1
2
3
ExecutorSerivce exec = ...;
Callable<V> task = ...;
Future<V> result = excutor.submit(task);

上述程序中可以看到,Callable 中 Task 递交给 执行器执行,得到的返回值为 Future<V> 类型(名字意思是会在未来某时得到值)。

Future<V> 类的对象通过 get() 方法得到其中的值。get() 方法的执行依赖于该 Future<T> 对象对应的 Callable<T> 对象的 call() 方法的执行。get() 方法会一直处于阻塞(block)状态直到 得到 excutor 的返回值或者达到规定的时长(timeout has been reached, 此时抛出 TimeoutException)。而如果 call() 方法抛出异常,get() 方法会抛出一个 ExecutionException 来包裹 call() 方法抛出的异常。

Future<V> 类的对象的 cancel(boolean mayInterruptIfRunning) 方法可以尝试取消 任务。如果任务还没有开始执行,该任务就不会被 excutor service 安排。如果已经开始执行,并且 mayInterruptIfRunningtrue,任务会被中断。

d. 一次性向执行器服务传入多个任务

如果一个任务含有多个子任务,我们可以使用 excutor service 的 invokeAll() 方法一次性输入所有的待执行任务,方法如下:

1
2
3
4
5
6
7
8
9
10
// 统计多个文件中某个单词的出现次数,并求和
String word = ...;
Set<Path> paths = ...;
List<Callable<long>> tasks = new ArrayList<>();
for(Path p : paths) tasks.add(
() -> {return number of ocurrences of word in p});
List<Future<long>> results = executor.invokeAll(tasks);
// invoke() 操作会一直阻塞,直到所有任务执行完毕
long total = 0;
for(Future<long> result : results) total += result.get();

类似的,excutor 还有 invokeAny() 方法,它会在调用的 任务集合 中任意一个任务执行完毕的时候执行返回。

B. 异步计算

回调函数:就是将函数执行的主动权交给子线程中的任务,当子线程中的任务执行完毕后,子线程会通知主线程,主线程因而可以在第一时间使用子线程的计算结果进行计算。关于回调函数的参考

前面的 Future<V> 对象在等待结果的时候,会阻塞进程。Java 8 中引入了带有回调机制的 Furure 的改进版: CompletableFuture 类。该类通过回调机制,不会阻塞主线程来等待支线程的执行结果。

a. 对象构建

获得 CompletableFuture 类的对象大致有如下几种方法:

  1. 实例方法返回值为 CompletableFuture 类的对象。例如 HttpClient 类的 sendAsync() 方法。
1
2
3
4
5
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(new URI(urlString)).GET()
.build();
CompletableFuture<HttpResponse<String>> f = client.sendAsync(
request, BodyHandler.asString());
  1. 使用 CompletableFuture 类的静态方法 supplyAsync() or runAsync()Ref

runAsync() 使用 Runnable 接口的对象作为参数,supplyAsync() 使用 Supplier<T> 接口对象作为参数(与 Callable<T> 的区别是 Supplier<T> 不会抛出一个 checked exception)。参考下方代码:

1
2
3
CompletableFuture<String> f = CompletableFuture.supplyAsync(
() -> {String result; Compute the result...; return reslut;}),
excutor);

Note: 上述的 excutor 参数可以空缺,系统会调用默认的 excutor (excutor returned by ForkJoinPool.commonPool())。

b. 常用方法

CompletableFuture 提供了一系列的回调方法用于处理 Callable线程中得到的返回值,常用的有 thenAccept(), thenApply() 参考下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
// thenAccept(), 返回 void
CompletableFuture<String> f = ...;
f.thenAccept((String s) -> process the result s...);

// thenApply(), T->U, 返回 U 类型
// readPage() 函数,返回 CompletableFuture<String> 类型的值
// 该例子返回网页内容中的 网址链接
CompletableFuture<String> contents = readPage(url);
CompletableFuture<List<URI>> links = contents.thenApply(Parser::getLinks);

// f为函数,处理 future对象 CompletableFuture<T>
// thenApplyAsync(f), 让 f 在另一个线程中执行
CompletableFuture<U> future.thenApplyAsync(f);

CompletableFuture 类还提供了一些方法同时处理多于一个任务的结果。例如: thenCombine() (同时处理两个任务的结果), allOf (同时处理多个任务的结果)。

C. 线程安全

线程安全问题主要是由于不同线程中使用了相同的变量,从而可能引起值的更新错误、变量的可见性问题。

a. 可见性问题

参考下方的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static boolean done = false;

// 预期目标:goodbye 程序在 hellos 打印 1000次 "Hello i" 之前
// 一直累加 数字。最后在 goodbye 执行完毕后,打印 "Goodbye i".
public static void main(String[] args){
Runnable hellos = () -> {
for(int i = 1; i < 1000; i++)
System.out.println("Hello " + i);
done = true;
};
Runnable goodbye = () -> {
int i = 1;
while(!done) i++;
System.out.println("Goodbye " + i);
};
Executor executor = Executors.newCachedThreadPool();
executor.execute(hellos);
executor.execute(goodbye);
}

上述程序的实际效果是,Hello + i 打印1000遍之后,程序仍然无法终止。原因是在 hellos 中对 done 变量的更改无法反映到 goodbye task中。

我们称这种问题为可见性问题,即 done 这一变量,在task hellos 中的更新 对 goodbye task不可见。

造成上述问题的原因是 现代编译器、虚拟机会在编译程序时进行优化,部分优化措施可能引发上述问题。可能引发问题的优化主要包括下述两个方面:

  1. 由于内存相对处理器,读写速度太慢,所以虚拟机在执行程序时会将用到的变量数据 载入到高速缓存(cache)中,以提升执行效率。但这也造成了实际程序使用的是原数据的一份拷贝,当处理完毕后才会写回内存。
  2. 编译器会在不影响实际程序语义逻辑前提下,对执行步骤进行重排列。例如,声明两个 变量后对他们求和,处理器可能颠倒声明的顺序,或者将变量声明的过程做并行处理。

例如上面的 while(!done) i++; ,由于 while 循环中并不修改 done 的值,编译器在优化过程中可能会对代码执行顺序进行重排,从而前述代码编译后为 if(!done) while(true) i++;

通常有如下几种方案处理可见性问题,可以使得变量的更改对所有tasks可见:

  1. final 类型的变量在 初始化后,对所有线程可见;
  2. static 变量在初始化后,对所有线程可见;
  3. volatile 类型的变量的变化对所有线程可见;
  4. 可通过同步锁(lock)实现变量的安全更新。

对于上述简单的问题,可以通过声明 done 变量为 volatile 类型来处理:private static voltaile boolean done;

b. 冒险竞争问题(race condition)

当有多个tasks对同一个变量进行更改,可能出现 race condtion 问题,如下:

1
2
3
4
5
6
private static voltaile int count = 0;
...
count ++; // Task 1
...
count ++; // Task 2
...

上述代码可能发生问题,因为,实际上 count ++ 在实际的程序执行中,并不是最小的执行单元(operation is not atomic)。编译器会将其分为两步来执行:

1
2
register = count + 1;
count = register;

因而,上述的两个tasks更新 count 的值的程序,可能发生如下问题:

1
2
3
4
5
int count = 0;    // intial value
register1 = count + 1; // register1 == 1
register2 = count + 1; // register2 == 1
count = register2;
count = register1;

可以发现,虽然两个task都执行了一次 count++, 但是 count 的值并没有如预期变为2,它的值仍然是1。

如果想上述的程序变为线程安全的,则必须保证 count++ 在任意一个task中都能够完整的顺序执行完毕(这一部分的代码也称为 critical section),构成一个等效的 operation atomic。这一思路,可以通过添加 lock 进行保护,得以实现。

c. 安全的并发编程策略

除了上面的数据可见性,安全性问题,对于 Java 来说,在并发程序中被分享使用的数据 也无法通过系统的垃圾回收机制进行自动的回收。

为了解决上述问题,我们提供下述几种策略:

  1. Confinement。限制不同 task 之间共享数据。例如上面的计数程序,可以的做法就是仅仅在一个 task 中做计数处理,直到计数task处理完毕后,将结果交由另一个 task 处理。
  2. Immutability。在不同的任务之间 分享 不可更改的对象是安全的。例如通过一个task 生成一个不可更改的 数据 collection,然后在另一个 task 中将第一个task 生成的数据 collection 吸收组合生成另一个不可更改的 数据 collection。一些具体的技巧包括:
    1. 在对象 construction 之后就不再进行更改,可以将实例对象声明为 final 类型;
    2. 不要 泄露可以更改的状态,例如通过 non-private 方法可以更改对象的一些内部状态,需要禁止这一行为;
    3. 通过 传递对象的 copy 来实现内容的传递,而不是直接传递引用。
  3. Locking。通过 lock 可以保证数据一次仅仅能被一个任务获取。在 Java 中,Java 并发库(concurrency library)中提供了一些能够保证并发操作安全的数据结构。此外,我们也可以自己通过 lock 来实现并发安全的类。
    1. Note:在可能的情况下,并不推荐使用 locking。首先,它很容易发生错误,其次它强制部分程序不能并发执行,也会降低了并发程序的优势,降低程序效率。
  4. Partition。我们可以将数据分块处理,每一块数据可以并发的操作,Java 中的 streams 库就是使用的这个思路。

D. Java 库的并行运算支持

a. 流和数组的并行运算

Java 的 stream 库以及 Arrays 类都在内部集成了并行算法。

Streams

例子如下:

1
long result = coll.parallelStream().filter(s -> s.startWith("A").count());

具体的,在实现中,stream会被分割为多段,分别进行 filter 操作,并在最后将结果进行组合。

对于 parallel 的流操作,需要注意以下几点:

  1. 并行流操作适用于数据量较大,或者计算量较大的场景,对于小数据量的场景,无法体现出其优势,没有必要使用 parallelStream;
  2. 数据需要存放在内存中;
  3. 流需要能够被有效的进行分割,具体的,对于背后使用 array或者 平衡二叉树的流,并行算法可以很好的提升效率;而对于背后是 链表实现,或是 Stream.iterate 生成的流,则不适合。

Arrays

例子如下:

1
Arrays.parallelSetAll(values, i -> i%10);

类似的,数组的并行处理也是通过分块来实现的。

其他的并行算法还包括 parallelSort(), parallelPrefix()

我们也可以将数组变为 parallel stream 来实现更加复杂的并行处理:

1
long sum = IntStream.of(values).parallel().sum();

b. Java 提供的线程安全的数据结构

在 Java 的 java.util.concurrent 包中,提供了一系列线程安全的集合类。

一个需要注意的是,对于这些线程安全的集合类,他们提供的 iterators 具有 弱一致性(weekly consistent),也就是说,迭代器(iterator)中记录的元素是 construct 该迭代器时集合中的元素,后续对集合进行的更改不一定(may or may not)会在迭代器中进行反应。

需要注意的是,线程安全的类并不是所有的方法都是线程安全的,想要保障线程安全,需要调用他们的特定方法。下面介绍一些具体的线程安全的集合类。

并发 Hash Map

ConcurrentHashMap 是线程安全的 hash map。它通过提供下述更新集合元素的方法来实现安全的多线程操作:

1
2
3
4
5
ConcurrentHashMap<String, long> map = new ConcurrentHashMap<>();
// compute 方法,利用key值 word 更新 map中的条目
map.compute(word, (k, v) -> v == null ? 1 : 1 + v);
// merge 方法,更新 key 值为 word 的条目
map.merge(word, 1L, Long::sum);

此外,还有 putIfAbsent(), computeIfAbsent(), computeIfPresent() 等方法来实现线程安全的操作。

对于上述线程安全的方法,实现的原理是 在程序中 上述方法会被当做原子型方法来处理(atomic),也就是执行上述方法的时候,不会被其他线程打断。

由于上述方法为 atomic,所以在上述方法中传入的计算过程不能太长,不能为长耗时工作,否则会严重影响系统效率。

除了单个元素的更新查询方法, ConcurrentHashMap 还提供一些大型的操作方法,包括 search, reduce, forEach 方法,这些方法实现线程安全的原理是获得一个元素集合的 snapshot,然后在获取的这个 snapshot 上进行执行。

需要注意的是,对于 ConcurrentHashMap 类,同样有 put() 等与 HashMap 相同的方法, 但是这些方法不是线程安全的方法。

阻塞队列

在 Java.util.concurrent 包中,还提供了线程安全的队列类型,为 阻塞队列(blocking queue),具体包括实现类 LinkedBlockingQueue(利用链表实现), ArrayBlockingQueue(利用循环数组实现)。

阻塞队列的工作原理是,生产者(producer)负责向队列中添加元素,消费者(consumer)从队列中取用元素。当队列满或者队列为空时,队列的操作会被阻塞。队列通过这样的方法来实现生产和消耗的平衡。具体的方法参考书中表格10-3。

使用阻塞队列的一个注意点是设定好 consumer 终止的条件。一个方法是添加 last item 的指示符,当consumer 读取到 last item 的指示符后,consumer 便不再等待而阻塞队列。

其他线程安全的集合类

ConcurrentSkipListMap 是一个并发安全的,且可以按照顺序遍历 key 值的 Map 类(类似 HashMap 和 TreeMap)。

CopyOnWriteArrayList, CopyOnWriteArraySet 通过让所有的执行元素变化的操作 复制 这个集合的方法来实现线程安全。

没有直接的线程安全的 set 提供,但是可以通过 ConcurrentHashMap 的静态方法 newKeySet(), 构造线程安全的 set。

1
Set<String> words = ConcurrentHashMap.newKeySet();

newKeySet()构建的 Set<K> 实际上是一个被包裹了的 ConcurrentHashMap<K, Boolean>, 所有的 key 值都设为 Boolean.TRUE

原子型的计数器

特别的,对于 计数器,java.util.concurrent.atomic 包中提供了一些线程安全的计数器。

AtomicLong, 保证一个线程安全的 类似long 型整数的值。一些方法举例如下:

1
2
3
4
5
6
7
8
// 更新,累加1  incrementAndGet()
public static AtomicLong nextNumber = new AtomicLong();
// In some thread ...
long id = nextNumber.incrementAndGet(); // nextNumber + 1,并返回更新后的值

// 对于更加复杂的更新策略,使用 accumulateAndGet()
public static AtomicLong largest = new AtomicLong();
largest.accumulateAndGet(x -> Math.max(x, observed)); // 将observed 与 x 中较大的值返回给 largest

AtomicLong 还有方法包括 getAndUpdate()(返回更新前的值), getAndAccumulate()(返回更新前的值) …

对于有大量线程同时使用 atomic 值的情况,系统的性能会由于 实现的 乐观性变得低效(理解(may not right):对数进行更新时的逻辑是这样的,首先获取数 A 的值,然后计算更新先放在临时B处,此时,比较之前获取的A的值与现在的A 值是否相同,如果相同,说明没有线程冲突,将B中的值更新到 A 中,否则,重复上面的行为)。为解决上述问题,可以使用 LongAdder, LongAccumulator, DoubleAdder, DoubleAccumulator 作为累加器。

LongAdder 等的实现原理是通过为不同task 提供不同的数块用于各自的更新,最后求和来得到总的累加值。

LongAdder 的方法有:increment(), 加一操作;add(), 加一个特定值;sum(), 求和,即求所有线程的累加值。

LongAccumulator 的方法有:accumulate()加操作; get() 获取当前值。