Categories
程式開發

Ray 分布式计算框架详解


Ray 是 UC Berkeley RISELab 出品的机器学习分布式框架。UC Berkeley 教授 Ion Stoica 写了一篇文章:The Future of Computing is Distributed“。里面详细说了 Ray 产生的原由。总结一下,就是由于 AI 和大数据的快速发展,对于应用和硬件能力的要求提出了更高的挑战。需要有更适合的软件架构来匹配大规模实时的计算需求。

Ion Stoica 同时也是 Spark 产品的公司 Databricks 的创始人,Apache Mesos、Alluxio、Clipper 的项目主导人

Ray 的特点

我们从了解 Ray 到实践上线,大概有1年半的时间。为什么会找到 Ray ,主要还是基于高性能计算的需求。我们的场景主要是做投资的实时归因分析,由于涉及的数据量很大,对于计算的要求也很高。还有一个关键的门槛,金融模型大量使用 Pandas 和 Numpy 来做矩阵计算,需要针对 Pandas/Numpy 有更好的支持。实践下来,我觉得 Ray 有如下特点:

分布式异步调用内存调度Pandas/Numpy 的分布式支持支持 Python整体性能出众

存在很多和 Ray 类似的框架,但是如果把范围缩小,针对 python 用户,类似的主要就是 Dask、Mars、Celery 等 。

和 Dask 对比

Dask 是 Anaconda 的产品,背后的主要贡献者是 Matthew Rocklin 。Dask 的 blog” 和 Matthew Rocklin 的 Blog” 可以经常去看看,更新很频繁,内容很不错。最近 Matthew Rocklin 加入了 Nvidia ,开始做Dask_cudf,开发基于 GPU 的 Dask,结合 Rapids cudf“(基于 GPU 的 Pandas)。最新的 blog 里面,显示他准备建立一个 Dask 的公司“,推进 python 的分布式数据平台

Dask 和核心是弥补 python 在数据科学中的不足,主要是性能上。Python 单机的能力不能够支持数据科学中大数据集的快速计算。

Dask 提供了基础的数据结构,底层是分布式计算架构。数据结构包括:Array 、Dataframe。Array 兼容 numpy 的ndarray,Dataframe 兼容 Pandas 的 Dataframe 。”兼容”是个相对的说法,毕竟 Pandas 和 Numpy 发展多年,本身也在发展,接口非常多。Dask 应该是在兼容这块做的非常好的,但是和 Pandas/Numpy 还是有差异,这一点要注意。

就像刚才提到的,Dask 的目标是为了弥补 Python 在数据科学上的不足。而 Ray 的出发点是为了加速机器学习的调优和训练的速度。Ray 除了基础的计算平台,还包括 Tune (超参数调节) 和 RLlib (增强学习)

因为数据科学和机器学习基础都是 Python 也是核心,所以分布式和对 Python Pandas/Numpy 的支持是这两个框架的基础。但是有一点不一样的,也是我们决定要采用 Ray 的核心原因。Ray 的底层内存数据结构的基础是 Apache Arrow,而 Dask 是 xarray/xray 。xray 是 NumFocus 赞助的开源类似 numpy 的数据结构。Apache Arrow 拥有更好的生态,被大部分的数据处理系统接受,非常有利于和其他系统的融合

Apache Arrow 和 Plasma

Apache Arrow 是列式内存数据结构,已经成为数据处理领域最通用的数据结构。Arrow 最突出的特点就是生态非常丰富和性能出众

Arrow 的背后还有一位 committer 和 co-creator,叫 Wes McKinney” 。Wes McKinney 是 pandas 的作者,所以 Arrow 对 Pandas 的支持也非常好

Ray 团队基于 Arrow 开发了一个内存数据服务,叫做 Plasma 。Plasma 在 Linux 共享内存创建了 Arrow 封装的对象,单独作为一个进程运行。其他进程可以通过 Plasma Client Library 来访问这块共享内存里的 Arrow 存储。这个功能是 Ray 团队开发的,贡献给 Arrow 作为 Arrow 生态的一部分。

除了 Plasma ,Ray 团队还贡献了一个叫做 Modin 的功能,就是基于 Ray 的分布式能力,提供了 Pandas 的实现。类似于 Dask 的 Dataframe。这个功能已经从 Ray 独立,成为一个独立的项目

软件架构

Ray 的基础结构可以参考 Paper https://arxiv.org/abs/1712.05889” 

Ray 分布式计算框架详解 1

经过几个版本的迭代,有些内容做了一些优化,主要的结构还是如上图。GCS 作为集中的服务端,是 Worker 之间传递消息的纽带。每个 Server 都有一个共用的 Object Store,也就是用 Apache Arrow/Plasma 构建的内存数据。 Local Scheduler 是 Server 内部的调度,同时通过 GCS 来和其他 Server 上的 Worker 通信。Object Store 时间也有通信,作用是传递 Worker 之间的数据。

在 Paper 里面描述了一个典型的远程调用流程:

Ray 分布式计算框架详解 2

可以看到,GCS 储存了代码、输入参数、返回值。Worker 通过 Local Scheduler 来和 GCS 通信。Local Scheduler 就是 Raylet, 是单机上的基础调度服务。

Ray 分布式计算框架详解 3

Object > 100 kb 会通过 Object Store 之间的并行 RPC 来传输,而不通过任务调度 RPC 来实现。Apache Arrow 在 0.15 之后提供了一个 Apache Arrow Flight 的 RPC 框架,0.16 又做了强化。不知道 Ray 的 Object 的并行传递是不是采用 Arrow Flight。下图是一个 任务调度的 RPC 示例图:

Ray 分布式计算框架详解 4

以上两个示意图来自于How Ray Uses gRPC and Arrow to outperform gRPC

下面我们详细来了解了一下 Raylet

Raylet

我重新画了一个简单一点的 Worker 和 GCS 的关系图:

Ray 分布式计算框架详解 5

Raylet 在中间的作用非常关键,Raylet 包含了几个重要内容:

Node ManagerObject Managergcs_client 或者 gcs server

Node Manager 是基于 boost::asio 的异步通信模块,主要是通信的连接和消息处理管理;Object Manager 是 Object Store 的管理;gcs_client 是连接 GCS 客户端。如果设置RAY_GCS_SERVICE_ENABLED 为 True 的话 ,这个 Server 就是 作为 GCS 启动。

我们先看一下 Raylet 的启动过程:

Ray 分布式计算框架详解 6

首先,要做 Raylet 的初始化,这里面包含很多参数,包括 Node Manager 和 gcs client 的初始化。然后 Raylet Start 之后,注册 GCS,准备接收消息。一旦有消息进来,就进入 Node Manager 的 ProcessClientMessage 过程。在解释 ProcessClientMessage 的操作之前,我们需要了解一下 Ray Worker 和 Raylet 的进程/线程和通信的模型

通信模型

Ray 采用的是 Boost::asio 的异步通信模型,这里有一个很丰富全面的关于 asio 的介绍

Ray 分布式计算框架详解 7

Asio 采用的是 Proactor 模型。一个操作经过 Initiator 之后分解为 Asynchronous Operation Processor(AOP) 、Asynchronouse Operation(AO) 和 Completion Hanlder(CH) 。AOP 做具体的工作,执行异步操作。执行完成之后,把结果放入Completion Event Queue(CEQ)。Asynchronous Event Demultiplexer(AED)等待 CEQ ,如果 CEQ 出现完成事件,则返回一个完成事件到 CH

Raylet 启动了一个 main_service , 是 boost::asio::io_service 。io_service 也是 asio 运转的核心组件。前面的AOP、AED 和 Proactor 都是由 io_service 串联起来的。io_service 内部实现了一个任务队列,队列的任务就是void(void) 函数

io_service 的接口有 run 、run_one、poll、poll_one、stop、reset 、 dispatch 、post 。run 方式就是轮询执行队列里面的所有任务,无任务执行的时候就 epoll_wait 上阻塞等待


// Initialize the node manager.
boost::asio::io_service main_service;
main_service.run();

Node Manager 在初始化的时候,会按照 num_initial_workers 的数量初始化 worker pool 。然后Node Manager 会按照 asio 的异步机制,分配任务到这些 worker pool 里面的进程

接下来我们看一下 Raylet 、Worker 和 GCS 的消息传递和调度机制

消息传递和调度

Ray 后面的公司 Anyscale.io 的 blog 有一篇文章,叫做 Fast Scheduling in Ray 0.8” 。讲了怎么在 ray 0.8 里面优化调度

Ray 分布式计算框架详解 8

Worker 提交 task 到 raylet,raylet 分配 task 到其他 worker。同时 raylet 还需要把 task 、相关 worker 信息提交给 GCS。task 执行的参数和返回都需要通过 Object Store 来获取

接下来,我们看一下详细的消息传递和对应的一些执行过程

先看一下 Submit Task 这个操作: Worker 提交一个 Task ,就调用 SubmitTask 的任务到 Raylet 。Task 在 Raylet 内部有一个 Lineage 的机制。这个也是上面 Anyscale 图里面的 task lineage

我们先了解一下 Task Lineage 的机制

Task Lineage

Task Lineage 里面包含几个概念,Lineage Cache 、Lineage Entry 和 Lineage 。Lineage 是管理 Task 执行的 DAG (有向无环图) ;Lineage Entry 是对 Task 状态的一些管理;Lineage Cache 是对 Task 在本机执行缓存的管理。在上面 Fast Scheduling in Ray 0.8” 文章里面,主要就是通过对 Lineage 的优化来提升 Ray 0.8 的调度性能

Ray 分布式计算框架详解 9

在 Ray 0.8 里面,把调用其他 Worker 的流程,从 Raylet 到 GCS 然后到 woker ,改为直接查询 Lineage Cache,如果 Worker 曾经调用过,就直接请求对应的 Worker。减少调用路径,提升效率。

回到我们对 Lineage 的分析

Task 在 GCS 里面有几个状态:None、Uncommitted、Committing、Committed 。None 意思是在 Lineage Cache 里面不存在;当任务从 Woker 提交之后,是 uncommited 状态了;当任务发生一些变化,经过一些操作或者重新提交,就是 Committing 状态。意思就是正在进行 Committing,等待返回状态;提交的任务得到了反馈,就是 Committed 状态。但是有一个不同,任务没有删除,当下一个任务还是调度这个 Worker 的时候,就可以直接调用这个 Task Entry 来实现。这就是上面说的优化过程

TaskEntry 保存 Task 的状态和相关的联系。主要包含这么几个内容:

GcsStatus:就是上面说的 Task 的状态parent_task_ids_:一个 Set ,保存了 Task 的父任务 ID 列表forwarded_to_:一个 Set,保存了任务明确提交之后提交到的 Node Manager 的 ID 列表

Lineage 维护了两个 map。一个是 Task 和 LineageEntry 的 map;一个是 TaskID 和 TaskID Set 的 Map。第二个的意思就是 Task 和它子 Task 组的映射

LineageCache 是 Task 的 Cache Table 。包含了 Task 的信息和状态。Lineage Cache 的策略是把所有的任务成为 Uncommitted 状态。为了安全起见,只有当 Task 的父任务都删除了,子任务才能删除

Lineage 的细节还很多,而且还处在优化的状态。我们先看看通过一个 Task 提交的过程来看看 Lineage 是怎么运转的

提交任务

Submit Task 之后,先记录增加了一个 Task。然后拿到需要提交的 Task Spec,就是 Task 详细信息。然后提交。Task 有几个状态:

Placeable:就绪的状态,可以分配到 Node, 可以是本地或者远端。分配的原则根据资源状况,例如本地的内存、是否超过 Task 最大数量等。如果本地资源不够,就会提交到其他的 Node ,也就是服务器。当然,如果其他 Node 资源也不够,就会继续分配。WaitForActorCreation:这个转改是针对 Actore Task ,代表 Actor 方法等待 Actore 完成返回Waiting:Task 在等待它的参数的依赖关系满足要求。也就是 Task 的参数需要放到 local object storeReady:Task 可以运行,所有的参数已经传输到 local object store 了Running:Task 已经分配冰运行到一个 workerBlocked:Task 暂停。可能是因为 Task 正在等待启动其他 Task 并且等待结果返回Infeasible:Task 所需要的资源所有机器都不满足Swap:两个状态中的一个转换状态。例如一个 Ready 状态的 Task ,提交到了一个 worker,在等待返回的时候。就处在 Swap 状态。如果 Worker 接收了这个 Task,task 状态会变为 Running,否则它就会返回到 Ready 状态

在 design_docs/task_states.rst 文档里面有一个图描述了 Task 的状态变化过程:

Ray 分布式计算框架详解 10

在 SubmitTask 最后:

// if the task was forwarded.

if (forwarded) {
// Check for local dependencies and enqueue as waiting or ready for dispatch.
} else {
// (See design_docs/task_states.rst for the state transition diagram.)
local_queues_.QueueTasks({task}, TaskState::PLACEABLE);
ScheduleTasks(cluster_resource_map_);
}

如果要提交的 Task 需要 forward (在收到 HandleForwardTask 操作的时候),则进行 Task 如队列操作。入队的时候,如果参数都满足,也就是本地资源足够。Task 就入队列,成为 READY 状态,如果不满足,就是 WAITING 状态。同时改变 Task 状态为 Pending

if (args_ready) {
local_queues_.QueueTasks({task}, TaskState::READY);
DispatchTasks(MakeTasksByClass({task}));
} else {
local_queues_.QueueTasks({task}, TaskState::WAITING);
}

task_dependency_manager_.TaskPending(task);

调度策略

在 SubmitTask 之后,如果不是 forward ,则执行两个操作:

local_queues_.QueueTasks({task}, TaskState::PLACEABLE);
ScheduleTasks(cluster_resource_map_);

第一个是把 Task 放到本地,并且把 Task 状态置为 Placeable;第二是把 Task 在集群进行调度

Ray 针对 Task 有两个 Queue:ReadyQueue、SchedulingQueue 。 ReadyQueue 是已经准备好的 Task 的队列;SchedulingQueue 是已经提交的 Task 的队列。这两个队列用来存储不同状态的 Task,实现上面说的 Task 状态变化过程。

调度任务的步骤是这样:

先尝试把 Tasks 放在 Local Node如果 Local Node 有资源如果没有合适的计算资源,就采用硬分配的方式,给 Client 安排计算资源

可以看出,调度就是基于资源的分配。资源包括计算、内存/数据,在 Ray 体现为 Worker 、Task 、Object Store(Arrow)。所以我们需要搞清楚资源,才能更好的理解调度

集群架构

按照上面的描述,Ray 集群有 Worker 、Gcs 和 Raylet 等模块。Worker 是一个执行单元。 Worker 的执行是通过 gRPC 来远程提交的。整个架构有点像 istio 的 service mesh 的结构

Ray 分布式计算框架详解 11

对应以上的粗粒度的组件,拆解开来就像下面这样:

Ray 分布式计算框架详解 12

这里面有几个关键组件。Raylet 是处理 Worker 和 GCS 的关键连接点,还有处理 Local Worker 之间的调度。Raylet 里面包含 Node Manager,这是处理消息传递和调度的基础模块;还有 Object Manager ,这是处理本机 Arrow 内存读取的组件,相对容易理解;Core worker 组件针对 Python Driver 提供支持,主要是完成 Task 的调度。就是 python 里面使用 ray 时候需要加的 remote 注解。这个是 Ray 的核心。Python Driver 主要是针对 python 提供支持,当然 Ray 也有 Java Driver ,这里没有列出

我们先从 Raylet 看起

Raylet

在 Raylet 初始化的时候,初始化了一个 main_service 。 这是一个boost::asio::io_service 实例。这个在上面的通信模型里面简单描述了一下 asio 的机制。main_service 在 main.cc 启动(main_service.run()),main_service 的引用传递到了 Raylet ,然后 Raylet 应用传递到了 Node Manager

Node Manager(下称 NM)是 Raylet 的一个负责通信的模块,处理 Raylet 和其他分布式节点(服务器)、Worker、Task 分配还有 GCS 的通信

从 Raylet 到 Node Manager 的入口在 HandlerAccept :

ClientHandler client_handler =
[this](LocalClientConnection &client) { node_manager_.ProcessNewClient(client); };
MessageHandler message_handler =
[this](std::shared_ptr client, int64_t message_type,
const uint8_t *message) {
node_manager_.ProcessClientMessage(client, message_type, message);
};
// Accept a new local client and dispatch it to the node manager.
auto new_connection = LocalClientConnection::Create(
client_handler, message_handler, std::move(socket_), "worker",
node_manager_message_enum,
static_cast(protocol::MessageType::DisconnectClient));

client_handler 是处理连接请求,message_handler 是处理这个 Client 的消息。LocalClientConnection 是一个针对客户端请求到服务端的抽象,主要是基于 asio 机制把读写,和异步读写封装了一下

ProcessNewClient 主要是记录 Client 的一些信息 ProcessClientMessage 就是上面架构图里面的消息处理,对应不同的消息处理流程。可以看一下附录:《Node Manager 处理消息的列表》

这里面最重要的一个,是 SubmitTask。是针对 task 的处理,Task 作为主要任务调度的模块,贯穿 Ray 分布式任务调度的全过程。所以我们有必要从源头来了解和跟踪一下 Task 的发起到完成的整个过程。同时,我们也可以通过这个过程,了解从 Python Driver 到 Core Worker ,然后到 Raylet 的处理过程。

Submit Task

Task 是表示一个任务及其执行的资源等信息。Task 的发起是从 Python Driver

@ray.remote
def borrower(inner_ids):
inner_id = inner_ids[0]
ray.get(foo.remote(inner_id))

inner_id = ray.put(1)
outer_id = ray.put([inner_id])
res = borrower.remote(outer_id)

例如以上的代码,@ray.remote 注解下面的函数,就是一个执行体。对应的是 RemoteFuntion Class 。在 _remote 这一段:

self._pickled_function = pickle.dumps(self._function)

_function 序列化为 _pickled_funtion,然后再 hash 为 pickled_function_hash

self._function_descriptor = PythonFunctionDescriptor.from_function(
self._function, self._pickled_function)

def from_function(cls, function, pickled_function):
pickled_function_hash = hashlib.sha1(pickled_function).hexdigest()

然后就调用了 Core Worker 的 SubmitTask

Status SubmitTask(const RayFunction &function, const std::vector &args,const TaskOptions &task_options, std::vector *return_ids,int max_retries);

在 SubmitTask 里面。生成一个 task id ,然后通过 BuildCommonTaskSpec 函数,把 Task 所有信息封装成一个 TaskSpecification 实例。然后把这个 TaskSpecification 提交到 Task 的任务队列里面。

if (task_options.is_direct_call) {
task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, max_retries);
return direct_task_submitter_->SubmitTask(task_spec);
} else {
return local_raylet_client_->SubmitTask(task_spec);
}

这里面 Task Manager 是对 Task 管理的一个封装。包含了对应的内存 in_memory_store_、引用计数 reference_counter_(主要用作对 ObjectID 的管理,用在 GC 上),任务的状态和 Retry 次数等。这里面 task_manager->AddPendingTask ,主要是针对 Task 提交前做了记录,记录 Task ID,为了 reference_manager_ 之后的 GC 用

is_direct_call 是针对 Actor worker 的直接调用。local_raylet_client_ 就是上面提到的 Raylet Client,Core worker 把接收到的 remote 调用提交到 Raylet ,Raylet 来做调度。就是下图红色的那一段:

Ray 分布式计算框架详解 13

在 Raylet 的 Node Manager 接收到 SubmitTask 消息,按照 Task 的依赖次序来提交 Task。意思就是,如果一个 Task B 依赖于另外一个 Task A,那就先提交 Task A

如果任务是提交到另外一个 Node(这个取决于 Lineage 调度,forwarded 是 True,forwarded 是 SubmitTask 的最后一个参数),则在 Lineage Cache 增加一个 UncommittedLineage

lineage_cache_.AddUncommittedLineage(task_id, uncommitted_lineage)

这里面第二个参数,是 SubmitTask 的时候,生成的一个 Lineage 的实例。

如果任务是提交到本地(forwarded 是 False,默认),则异步 commit task 到 GCS:

lineage_cache_.CommitTask(task)

调用的是 Lineage 的 CommitTask,然后调用 Lineage 的 FlushTask,接着调用 gcs_client_->Tasks().AsyncAdd 把 Task 状态提交到 GCS,然后根据返回状态更新本地 Lineage 的 Task 状态为 GcsStatus::COMMITTED。同时 Evict Task 和 UnSubscribeTask

在处理好 Lineage Cache 之后,SubmitTask 在本地的 Task Queue 里面增加这个 Task,然后调用 ScheduleTasks 来调度()

整个过程流程图如下:

Ray 分布式计算框架详解 14

在 DispatchTasks 里面,按照 class order 分配。避免其中一个任务执行的时候卡住,导致 Ray 启动多个 worker 来执行。这个问题在 #3644” 有描述。

参考文档

Ray Paper

Ray 分布式计算框架详解 15