Categories
程式開發

任务级并发与 Executor 框架


对于多数提供操作系统级别抽象映射的程序设计语言来说,并发程序的基本复用单位是线程。在大多数现代操作系统中,线程是基本的调度单位。在没有明确的协同机制的情况下,线程将彼此独立执行。同时,同一线程同一时刻只能执行一个动作,这意味着同一线程内代码的串行执行是可以得到保障的。

然而,由线程倒逼出应用程序的逻辑拆分是不合实际的。通常,应用程序都是围绕【任务执行】来构造的。任务是抽象的、离散的工作单元。通过把应用程序的工作拆分到多个任务中,可以简化程序的组织结构,并在可能的情况下提升应用程序的性能。

Java 为任务执行抽象出 Executor 接口,其定义如下。

public interface Executor {
void execute(Runnable command);
}

可以看到,一个 Runnable 接口的实现类作为一个任务被传入到任务执行器 Executor 中,这就是任务执行的 Executor 框架的基本抽象。

任务执行的模型不依赖于底下的线程模型,但是在 Java 的世界中,任务实际执行的地方就是线程。我们先看到任务执行与线程的对应关系的几种情况以及其优缺点。

如同 JDK 文档所举的例子,虽然本文讨论的是任务级并发,但是任务执行本身并不意味着并发。一个简单的例子就是所有的任务执行都在当前线程上执行,其对应的 Executor 实现大致如下。

public class DirectExecutor implements Executor {
@Override
public void execute(Runnable command) {
command.run();
}
}

这在任务执行的策略中可以称为串行的执行策略。概念上,它是一个合法的任务执行策略,并且实现简单,容易理解。但是显而易见的,它在高负载的情况下性能会有很大的问题。例如,对于一个网站服务器来说,这意味着同时到来大量的请求的同时,后面的请求必须等待前面请求的完成才能得到处理。由于延迟的传递性,越往后的请求等待时间是近乎线性累加的,这显然不符合网站及时响应用户请求的需求。

因此,我们考虑将请求分摊给不同的线程来处理。一种自然映射是为每个请求即每个任务显式地创建一个工作线程,其对应的 Executor 实现大致如下。

public class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable command) {
new Thread(command).start();
}
}

通过为每个任务分配一个线程,能够保证任务的执行对于处理任务提交的线程来说是瞬时的,实际的工作将在新的线程当中执行,并通过某种形式的同步传递结果。这种方案也是实现简单且容易理解的。但是对系统级线程有所了解的开发者能够很快发现在高负载的情况下这一方案的问题。

系统级线程的生命周期的开销非常高。这里一行创建线程的代码,不同于变量的赋值,底下隐含的开销是重量级的。根据操作系统的不同,线程创建的开销也不尽相同,但都有远超寄存器操作的时间,同时需要 JVM 和操作系统提供一定的辅助操作。这将会延迟任务本身被处理的时间。如果请求本身是轻量级的,那么请求所需的执行开销和为了执行请求创建线程的开销的比值将会非常悬殊。系统级线程的申请和维护将占用资源。活跃的线程会消耗系统资源,尤其是内存资源。如果可运行的线程数量多于可用处理器的数量(这个值通常是计算机配置中的核心数),那么有些线程将被闲置。闲置的线程会占用内存,而且 CPU 调度时大量线程竞争 CPU 资源将会带来其他的性能开销。因此,如果线程数量已经能够打满核心,再创建更多的线程反而会降低性能。最后,由于上述原因,且 JVM 实例是按照进程来隔离的,同一进程的不同线程运行在同一个 JVM 实例上,资源枯竭触发系统或虚拟机的保护机制,例如 OutOfMemoryError 异常的抛出,将会挂掉整个 JVM 实例,导致所有的线程一起受到影响。

从上面两个例子中,我们看到,无论是只用一个线程串行地对付所有任务,还是每个任务创建一个线程去处理,都有显而易见的问题。计算机科学讲究的是折中(trade-off),那么有没有这两个方案的折中方案呢?

既然 1:1 和 1:n 都不行,那么一个自然的想法就是尝试 m:n 的模式。也就是说,对于给定的任务集,创建给定的线程集来处理。实践中,这样的线程集称为线程池(ThreadPool)。

从任务的视角看,Executor 框架满足生产者-消费者模型。调用 Executor#execute 即生产一个任务,而线程池在底下分配一个线程执行任务,即作为任务的消费者。从实现上看,连接这两者的是工作队列。任务被提交的时候由工作队列接收,线程池中的工作线程从工作队列中取得任务并执行。

另一方面,虽然我们在上面的描述中采用了【创建】线程池来描述任务和线程的对应关系,但是实际上由于线程被线程池所管理,执行完任务的空闲线程能够被复用以执行其他任务。显然,由于不需要重新创建线程,在处理高负载下的海量任务时,可以复用一开始就创建好的若干个线程逐个对付,这相对于每个任务一个线程的模式将大大减少线程创建的开销。

Executor 框架的引入使得任务的执行策略和任务本身解耦,从上面的例子来说,我们无需在提交任务的时候连带着指定用什么策略来执行它,而是将这一决策封装在 Executor 接口的实现当中。这里所说的执行策略定义了任务执行的各个细节。

任务在哪个线程中执行?前面提到,至少在 Java 的世界中,任务最终是要在一个线程上执行的。在上面的例子中,这可以是当前线程,或者一个新的线程,或者线程池中的线程。任务按照什么顺序执行?最多允许并发执行多少个任务?最多允许同时等待多少个任务?如果出于某种原因(例如资源限制)需要拒绝某个任务,应该如何选出这个任务?如果出于某种原因(例如资源限制)需要拒绝某个任务,应该如何通知这个任务?任务执行前后应该进行哪些动作?

后面几个问题对于两种退化情况来说是显而易见或不相关的,不过在不同的线程池实现当中,就要好好考虑这些问题了。

Java 开箱即用的提供了若干个线程池的实现,大多数通过 Executors 实用工具类能够取得。

虽然 Executors 提供了很多个方便的接口,但是实际上不同的配置底下是相同的两个线程池实现。其之一是最早的 ThreadPoolExecutor 实现,其之二是 ForkJoinPool 实现。

ThreadPoolExecutor 为一些 Executor 提供了基本的实现,包括 Executors 中的 newCachedThreadPool、newFixedThreadPool 和 newScheduledThreadExecutor 等。 ThreadPoolExecutor 是一个灵活的、稳定的线程池,允许进行各种定制。它的通用构造函数如下所示。

public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);

其中线程池的基本大小(corePoolSize)、最大大小(maximumPoolSize)和存活时间(keepAliveTime)等因素共同负责线程的创建和销毁。基本大小也就是线程池的大小,即在没有任务执行时的线程池大小,并且只有在工作队列满的情况下才会创建超出这个数量的线程。线程池的最大大小表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的线程,在线程池的当前大小超过基本大小时,将被回收。

我们可以看到在这里线程池中的线程不是固定不变的。即使对于 newFixedThreadPool 这样基本大小等于最大大小的线程池实例,如果其中某个线程由于异常退出,那么在它退出之后,线程池也会补齐线程数量到基本大小。也就是说,线程池中的线程应该视为无状态的通用线程,而不是一个确定的线程对象。进一步的,ThreadLocal 以及其他依赖固定线程对象的任务必须小心应对线程池的情况。要么使用单独的线程对象,要么改造以移除对线程对象的依赖。

前面我们提到,Executor 框架可以视为一个生产者-消费者模型的实现,其中任务生产和消费的缓冲区即工作队列。ThreadPoolExecutor 允许提供一个 BlockingQueue 来保存等待执行的任务,这个参数也出现在它的通用构造函数中。我们在这里仅提及这个事情以及使用有界队列时需要配合饱和策略即前文所述任务执行策略中拒绝任务的部分。关于工作队列的分类和特点在后续文章讲到 Actor 模型的 Akka 实现时再做展开。

另外值得一提的是线程池支持传入一个线程工厂来定制任务到线程的映射,通常这会被用于在创建线程之后定制线程的名字和注入指标监控工具等。

限于篇幅和时间,这里对剩下的内容做一个快速的罗列,后续再进行展开。

对于线程池的部分,还有两个值得注意的线程池。ForkJoinPool 线程池实现了工作窃取(work stealing)的工作队列,它提供一个静态的公共线程池,这个线程池是 CompletableFuture 异步执行时默认的线程池。ScheduledThreadPoolExecutor 作为 ThreadPoolExecutor 的子类实现了延期任务和定期任务的调度,同时展示了如何子类化 ThreadPoolExecutor 以实现更丰富的功能。

对于 Executor 的部分,当它实际持有线程的时候,只要持有的时间大于任务执行的时间,即线程的生命周期不受限于任务的生命周期,该实现类就应该实现 ExecutorService 接口并提供生命周期方法。这些生命周期方法将会协同处于不同状态的任务,具体可以和 Actor 模型的监督机制下的 Actor 退出相结合讨论。

对于接口的部分,目前与 Executor 框架相关的接口包括 Callable、Future 和 ComplatableFuture 以及后两者对应的异常体系等。这些接口的设计包括和它们在其他语言/三方库中的对等实现特别是对比函数式的实现是一个有趣的单独的主题。

最后,作为逸闻,引用 Microsoft 介绍异步编程的这篇文章“。其中有介绍 C# 世界里任务级并发的姿势,并且任务级并发在 C# 的世界里也击败了其他的模式,成为推荐的并发程序设计模式。