Categories
程式開發

在流式系统中如何引入Watermark支持:以Pravega和Flink为例


在流式计算的世界中,时间问题一直是困扰着业界的难点与痛点:如何能够更加精确地进行基于事件时间窗口的计算?Watermark的概念应运而生。Watermark试图将更加精确的时间参考引入流式计算,并取得了越来越多的流式平台的支持。Pravega也不例外,在最近的版本更新中(v0.6),Pravega已经加入了Watermark的完整支持。由于Pravega原生支持Segment级别的企业级动态缩放特性,在此基础上要实现Watermark并非易事。本文将按照“发现问题-解决问题”的线索,循序渐进地讨论Watermark机制在Pravega中的设计和实现,并对比Flink的实现。本文最早出自Pravega的官方博客。

1 动机

流式处理(Stream Processing)从广义上指的是从无界数据源注入数据并在注入的过程中进行数据处理的能力。这些数据可以是用户生成的数据,例如社交网络或其它在线应用;也可以是机器生成的数据,例如来自物联网和边缘应用的服务器遥测数据或传感器样本。

典型的流式数据处理应用通常按照数据产生的顺序依次处理数据。在实际应用中,由于以下原因,严格按照全序处理数据通常是无法实现的:

  1. 数据源本身就不是一个单一的元素,它可能由多个用户,服务器或者网关组成;

  2. 应用的内在设计也可能导致不同数据项目被乱序注入和处理。

因此,在Pravega和其它类似系统中,顺序都指的是数据注入的顺序,并且由“键”确定。“键”这一概念连结了数据流中的各个元素。

按生成顺序处理数据是流式处理最有意思的一面,因为这使得应用程序可以在不同事件中建立起一种临时的相关关系,尽管这种关系比较松散。例如,某个应用程序能够涉及这样的提问:在过去的一小时中有多少不同的用户登录了,或者在过去 的十分钟内有多少传感器报告了异常读数。为了实现并回答这些查询,应用程序必须能够为每一个报告周期生成相应的结果(第一个例子的报告周期是一小时,而第二个例子的报告周期是十分钟)。这些报告周期通常被称作时间窗口(Time Window)。

在数据生成时就进行数据处理使得应用程序可以在数据生成的同时就输出结果。对于有界数据集(不会新增数据),可以通过使用map-reduce对所有窗口并行地进行窗口聚合。而这对无界数据集(流)却并不可行,因为数据一直在动态持续增长。因此,对于持续生成的数据源,可以选择用map-reduce的方式周期性地处理数据集快照或增量(这将引入更长的处理时间),也可以用流的方式在数据注入的同时就进行处理。相对于周期性地处理,后者可以提供更低的端到端时延。

为了进行诸如窗口聚合之类的计算,首先必须拥有某种时间参考,并且使得每个数据元素(例如:消息,事件,记录等)都与一个时间值相关联。如果没有一个时间参考,应用程序就无法确定一个数据元素究竟属于哪个时间窗口。典型的用于讨论时间参考的时域包括事件时间(Event Time)和处理时间(Processing Time)。事件时间指的是数据源赋予事件的时间,通常用的是事件生成时的挂钟时间。处理时间用的是事件被进行数据处理时的时间作参考。某个事件所关联的时间要么是在应用程序从Pravega读取数据的时候被确定,要么是在事件被处理的时候确定。此外,我们还考虑注入时间(Ingestion Time),即进行注入的应用程序收到事件的事件。例如,在某个利用Pravega进行流式数据存储的应用程序中,注入时间就是事件被写入Pravega Stream的时间。图 1展示了上述三个时域。

在流式系统中如何引入Watermark支持:以Pravega和Flink为例 13

图 1 Pravega中的时间

由于这三种时域各自将事件生命周期中的不同时间点与一个事件相关联,它们必然存在差异。当数据源在事件生成的同时就立刻进行发送时,事件时间和注入时间之间的差异通常较小。但是由于网络连接原因,也可能出现一些具有显著偏差的离群点。注入时间和处理时间之间的差异取决于注入过程和处理过程的实际发生时间。事实上,对于Pravega来说,这一差值可能会相当大,因为Pravega是一个存储系统,数据在被注入之后可能在任意时间之后才被应用程序处理。在Pravega中,我们将这种在任意长时间之前就已经注入的老数据称为历史数据(Historical Data)。

能够使用我们上述讨论过的时域之一将一个时间值关联到一个事件上还远远不够。应用程序的确可以从一个时间戳推断出某个事件属于哪个时间窗口,但它如何能知道它已经收到了某个时间窗口内的所有事件并且可以关闭当前窗口了呢?在处理时间是连续递增的假设下,关闭一个基于处理时间的窗口非常简单,但对于事件时间和注入时间就完全不同了。对于事件和处理时间,进行数据处理的应用程序需要知道它何时(即便只是估计)能够关闭一个给定的窗口并报告计算结果。当然,应用程序也可以选择永远不关闭时间窗口并且持续重复处理窗口内数据。但是,在某个时间点,应用程序总是需要调用和使用最终的计算结果,然后向前推进,这已经等价于关闭当前时间窗口了。

为了让应用程序能够对时间窗口结束进行断言,我们需要知道事件关联时间的下界,而这些下界就被称作Watermark。Watermark w保证所有时间戳小于w的事件都已经被读取或者处理了(究竟是读取还是处理的语义则要依赖上下文确定)。然而,迟到的事件(Late Event)总是有几率发生。如何处理和最小化迟到事件则依赖具体的应用程序实现。图 2展示了Watermark的概念。

在流式系统中如何引入Watermark支持:以Pravega和Flink为例 14

图 2 时间和Watermark

为了计算基于时间窗口的聚合,我们需要能够将事件映射到窗口并且知晓何时能够关闭窗口(计算当前窗口内的聚合)。即便我们假设只有单一的事件序列,顺序赋值的方法也是行不通的,因为事件可以乱序出现。图中,事件7和事件8就出现了这种乱序。因此,对于每一个事件我们都需要一个时间参考,以便确定将它分配到哪个时间窗口。我们还需要知道何时能够关闭一个时间窗口,而Watermark正是这样一种抽象:通过提供时间下界允许窗口正确关闭。现实中,要提供严格的Watermark保证是极其困难的。分布式系统的异步本质使得为迟到事件提供强保证变得非常复杂。另外,从进度所关注的角度看,提前关闭时间窗口并允许一小部分迟到事件往往是一种较好的选择。这种选择通常是依赖具体应用程序的。

在这篇文章中,我们将会讨论Pravega新增的对事件时间和注入时间的支持。我们必须克服的关键难点之一就是如何在流式数据的Segment集合会因自动缩放机制而动态变化的情况下提供Watermark的支持。我们对Pravega的Reader Group加入了内部支持,以便简化与流式处理器的关联,例如Apache Flink。我们用Apache Flink作为基于Watermark的流式处理器的一个典型例子,讨论Flink对Watermark的支持以及与Pravega的Flink连接器(Connector)的集成问题。我们还会对如何与任意应用程序集成进行总结,并根据我们对该特性现有的经验给出建议。

2 示例:Apache Flink

Apache Flink是一个为流式和批式数据而设计的开源平台,而我们编写了一个连接器允许应用程序可以使用Flink处理Pravega的流式数据。Flink由一个允许应用程序编写作业(Job)的编程模型和一个执行Flink程序的分布式运行时环境构成。在运行时,Flink环境把一个程序映射成一个数据流,而这个数据流由一个或多个源(Source),一系列变换算子(Operator)以及一个或多个汇(Sink)组成。在本文关于Watermark的讨论中,源是最有意思的元素,因为正是由它利用Pravega的时间信息产生Watermark。

Watermark是Apache Flink中的核心概念。它允许一系列基于时间的计算,例如不同时域下的时间窗口:事件时间,注入时间和处理时间。在Flink中,它们被称作时间特征(Time Characteristics)。事件时间和注入时间在Flink中有着不同的定义。在Flink中,注入时间代表事件进入Flink数据流时的时间,而不是指事件被注入数据管道(例如写入Pravega)的时间。事件时间代表应用程序赋予的时间值,它涵盖了由应用程序确定的任意形式的时间和Watermark,包括在源端进行的基于Pravega传播的时间信息的赋值。因此,事件时间是Flink的时间特征,包含了Pravega所提供的事件时间和注入时间。图 3展示了这些不同的时间特征以及与Pravega的区别。

在流式系统中如何引入Watermark支持:以Pravega和Flink为例 15

图 3 Flink和Pravega中的时间

为了确定在一个作业究竟使用何种时间特征,Flink需要程序在执行环境中设置:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

当在Flink中使用事件时间时,必须对事件进行时间戳赋值,并且系统需要Watermark作为事件时间的进度指标量。这有两种实现方法:通过源直接进行或者通过时间戳赋值器(同时负责产生Watermark)。时间戳赋值器是作业规格(Job Specification)的一部分,它必须在第一个使用时间的操作发生之前被指明(通常在源之后)。时间戳赋值器将覆写源直接生成的时间戳和Watermark。

与Pravega最相关的选项就是让源赋值时间戳并且产生Watermark。当使用这一选项时,我们可以使用本文所描述的方法,在Pravega连接器中加入时间戳赋值和Watermark生成的支持。支持事件时间的Flink源需要调用如下方法:

  • SourceContext#collectWithTimestamp(T element, long timestamp): 从源中产生一个事件,并赋值时间戳。

  • SourceContext#emitWatermark(Watermark mark): 产生Watermark。

在接下来两个小节,我们将给出我们的设计,在Pravega中支持Watermark。在我们讨论完设计与实现之后,我们将回头展示更多与连接器集成的细节。

3 难点

假设现在我们有一个简单的应用程序和一组产生事件的传感器,一个Pravega流,以及一个Flink作业。就目前的讨论而言,究竟这个作业在进行怎样的操作并不重要,但我们假设它正基于Pravega的流进行某种形式的时间窗口聚合,并且它需要知道时间窗口的边界。

如果传感器本身能够进行时间戳赋值,这样写入Pravega的事件都附加着时间戳信息,那么Flink的作业源就可以提取这些时间戳并具有某种时间进度的概念。尽管这是一个合法的方法,但这么做有两个严重的缺点:

  1. 对于一个给定的时间戳,我们并不知道是否还有一个具有相同时间戳的事件,因此我们无法推进Watermark。

  2. 如果Flink源没有收到事件,它就不知道究竟是事件时间仍在向前推进而仅仅只是没有新事件产生,还是系统正在经历异步过程(例如事件被任意延迟)。

通常说来,迟到事件不可能完全避免,因为有太多情况可能导致迟到事件,例如连接或者节点不可用。但是,源和应用程序一般都有事件时间的相关信息(例如自身的时钟),并且在理想情况下我们应当传播这些信息以便Flink源可以更加精确地推进事件时间。

现在我们看一下如何用Pravega实现这些。假设我们周期性地向Pravega流的字节序列里写入标记来表征时间进度。这些标记指明所有事件时间早于这个标记的所有事件都已经写入了。这么做会带来三个问题:

  1. 具有多个Writer的Pravega流需要协调标记的写入,保证它们反映出所有Writer的状态。

  2. Pravega流通常都不是一个简单的字节序列,它一般由多个并行的Segment构成。

  3. Segment的内部实现是一组字节序列,因此将标记这种控制数据与应用程序数据混合存储在一起并不是一个好方法。

为了解决问题1,我们需要某种机制来参考所有已知的Writer,而问题2要求标记能够反映跨Segment的位置。对于问题3,我们需要在外部维护标记。图 4展示了一个往Pravega注入事件并进行处理的应用程序的时间流。

在流式系统中如何引入Watermark支持:以Pravega和Flink为例 16

图 4 Pravega中的时间流

参考所有Writer并不是一件简单的事,因为Writer可以在线也可以离线。我们选择的任何机制都必须考虑Writer集合的这种动态性。在外部保存标记的同时还要能够将它们映射到跨Segment的位置,我们需要某种数据结构来维护这种Segment到偏移量的映射关系,并且我们需要在流数据之外维护这些标记,例如在一个单独的Segment中。

现在,还有一个问题需要解决:空闲Reader。Reader Group协调对组内Reader的Segment分配。假设一个给定的Reader没有被分配到Segment。这种场景是可能存在的,例如,当组内Reader的数量大于Segment数量的时候。在这种情况下,一个没有被分配到Segment的Reader如何能够知道事件时间在向前推进?为了让空闲Reader在没有被分配到Segment的时候也能够产生Watermark,我们通过Reader Group的状态同步器(State Synchronizer)来协调事件时间的推进。这种协调使得所有Reader可以不依赖Segment的分配而推进事件时间。

到目前为止,我们一直在讨论时间却始终没有说明时间参考究竟从何而来。这是有意而为之:我们不想限制应用程序使用特定的时间参考,或者限定这种时间参考何时开始存在。这种时间参考可以是挂钟时间,非常接近数据生成时的当前时间,也可以是从文件读取事件时的任意过去时间点。我们不想试图规定或强制任何对时间赋值的方法,尤其是对于事件时间,我们希望应用程序可以根据自身的设计使用任何有意义的方法设置这个值。

在接下来的几个小节中,我们会详述我们的设计和实现。许多我们已经讨论过的抽象概念都会在余下的章节中具现化。

4 Pravega对Watermark的支持

Pravega的Watermark机制由三个主要部分组成,如提案文档所述:获取时间,时间戳聚合以及时间窗口的获取。

4.1 获取时间

首先是EventStreamWriter上的一个API,用于记录时间。这允许一个进行数据写入的应用程序向Pravega表明当前正在写入的数据所对应的时间。

EventStreamWriter writer = clientFactory.createEventWriter(stream, serializer, EventWriterConfig.builder().build()); 
//... write events ... 
writer.noteTime(currentTime);

这里,“noteTime ”API可以被周期性地调用,指明所有已经写入的事件都发生在某个时间之前。

这个API的结构使得那些不关心Watermark的应用程序不必额外做任何事情。此外,它还允许应用程序定义自己的时间概念。

类似地,对于事务性Writer,在事务的commit()方法上有一个可选参数,允许应用程序指明当前事务所写入事件的时间。

Transaction txn = writer.beginTxn();
//... write events to transaction. 
txn.commit(txnTimestamp);

noteTime()方法和commit()方法都接受一个时间戳参数,而并非直接查询系统时钟。这允许用事件时间的方式定义时间。

如果正在进行事件写入的进程并不是事件的真正生产者,例如事件来自Web前端,移动App,或者嵌入式系统,那么事件的发生与写入之间一定存在时间差。这同样适用于事件本身就是从某个上游源头导出的场景。例如,从某个流读取数据,用某种方式进行数据处理(比如聚合),然后再将其写入另一个流,这是非常常见的应用。

如果你的应用程序不需要定义时间,那么可以直接使用注入时间:有一个名为automaticallyNoteTime配置参数可以提供这一行为。你可以这样配置:

EventStreamWriter writer = clientFactory.createEventWriter(stream, serializer, EventWriterConfig.builder().automaticallyNoteTime(true).build());

当这一选项开启时,就无需再调用noteTime()方法了。

一旦获取了时间,流上的所有Writer都必须形成一个统一的视图。为了进行这种聚合,客户端在内部会将时间值与Writer的当前位置进行组合,并将信息发送给控制器(Controller)。

4.2 从多个Writer进行时间戳聚合

控制器从所有的Writer接收这些时间戳与位置信息。控制器这样做信息聚合:它从一个流上的所有Writer收集时间戳并输出一个Stream Cut,这个Stream Cut大于等于所有Writer当前位置的最大值,同时还输出所有Writer报告时间的最小值作为时间戳。如下:

在流式系统中如何引入Watermark支持:以Pravega和Flink为例 17

图 5 聚合Writer的时间戳

通过这样的方式聚合时间戳,当一个Reader的当前位置超过一个给定Stream Cut的时候就一定能保证已经读取了所有对应的事件。

当然,Writer可以在线也可以离线。很自然的,如果一个Writer关闭并且不再上线,我们不希望一直持有它的递增时间信息。为了排除这种情况,流上有一个名为timestampAggregationTimeout的配置参数。这一配置项指明当超过多长时间没有收到一个Writer的信息后,就把它排除在时间窗口计算之外。

为了让Reader可以读取这些聚合后的信息,控制器将聚合后的时间和Stream Cut信息写入一个特殊的Segment。这个Segment在Pravega内部被称为Mark Segment。Reader可以从这个特殊的Segment读取相应的信息来确定它们在流中的位置。

4.3 Reader获取时间窗口

最终,所有的Reader协调它们各自的位置信息,得到一个Stream Cut形式的组合位置信息。这有一点难理解,因为为了知道Reader相对于Mark Segment中所记录的Stream Cut的位置,Reader必须首先生成一个聚合后的Stream Cut。这需要Reader Group中所有Reader共同协作。我们是这样实现的:让每一个Reader都把它们的位置信息记录在一个状态同步器中。

一旦获取了一个位置信息,接下来就需要对它进行比较。事实上,比较的结果并不是一个单一的数值。例如,考虑如下Stream Cut上的一个Reader Group:

在流式系统中如何引入Watermark支持:以Pravega和Flink为例 18

图 6 一个时间上下界分别为T5和T2的Stream Cut

在这个例子中,Reader Group部分超越某个时间值,但又部分落后于它。如果你从Watermark的设计初衷考虑,这一切就都说得通了。数据在多个主机上被并行处理,我们想要确定这样一个时间点:在该时间点之前的所有事件都已经被处理了。

正因为如此,Reader收到的是一个TimeWindow数据结构而不是一个简单的时间数值。这正是Reader的分布区间。在上述例子中,时间上下界分别为T5和T2。这一时间窗口可以通过调用如下方法获取:

TimeWindow window = reader.getTimeWindow();

在这一过程中始终保持的不变量是,一个Reader得到的时间下界意味着所有早于这个时间点的事件都一定已经被读取了。还有一些极端的例子需要注意。

  • Reader Group可能位于当前流第一次记录的时间戳之前,在这种情况下,时间戳的下界无法准确定义。唯一可以确定的是,Reader Group处于第一个时间戳之前。

  • Reader Group可能位于控制器所记录的最后一个标记之后。例如,如果一个Reader Group正处于流的尾端并且消费速度紧跟注入数据,那么它很有可能在控制器聚合时间之前就完成了事件处理。此时,当应用程序调用getTimeWindow()方法时,返回的TimeWindow结构中,upperTimeBound成员可能为空值。类似地,lowerTimeBound成员也可能滞后于Reader的实际位置,因为它必须等待时间信息进行聚合操作。

  • TimeWindow结构是基于Reader当前已经读取的位置而不是应用程序处理的位置(因为Pravega根本无从知晓这一信息)。所以,如果应用程序由于Reader死亡而调用了readerOffline()方法指明需要重新处理事件,那么TimeWindow可能倒退以便反映出某些事件需要被重新处理,因为在Reader死亡的过程中,这些正在被处理的事件已经丢失了。

4.4 与处理逻辑的联系

在EventStreamReader接口上,getTimeWindow()方法返回一个TimeWindow对象。TimeWindow对象提供了时间的上界和下界。

这是一个基于拉取(Pull)而不是推送(Push)的模型,也就是说我们可以假设往流中注入一些“伪事件”。这一模型有如下优点:它无需强制为每一个流都处理时间,它允许时间在一个没有任何事件的流上向前推进,但最重要的是它为TimeWindow的计算频率提供了灵活性。

TimeWindow反映了流上的当前位置,因此,如果需要的话,可以在每次调用完readNextEvent()方法之后都调用它,或者也可以周期性地调用它以便将事件按窗口分组。

4.5 Flink连接器的示例

在Pravega的Flink连接器中就有这样一个例子,实现了TimestampsAndPeriodicWatermarksOperator接口:

@Override 
public void onProcessingTime(long timestamp) throws Exception { 
    // register next timer 
    Watermark newWatermark = userFunction.getCurrentWatermark(); 
    if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { 
        currentWatermark = newWatermark.getTimestamp(); 
         // emit watermark 
         output.emitWatermark(newWatermark); 
    } 
    long now = getProcessingTimeService().getCurrentProcessingTime(); 
    getProcessingTimeService().registerTimer(now + watermarkInterval, this); 
}

此处,连接器获取时间窗口,如果条件满足,则推进Flink的Watermark,生成新的Watermark并调度任务在一个可配的时间间隔后重新运行。

由于这段逻辑是在连接器上实现的,所有使用Pravega的Flink应用程序都可以通过使用标准Flink的API享受到基于事件时间或注入时间的Watermark的好处。

5 总结

处理流的尾部数据和历史数据是Pravega的两大组成特性。Pravega存储流式数据并使用统一的API允许应用程序在数据可用时立即处理或者在将来任意时间处理。为了结果的一致性,流式数据需要有一个时间参考以便使得结果与流式数据何时被处理无关,并且这也绑定了时间允许进行时间窗口计算,这是非常关键的一点。正是流式数据的这种对时间信息的需求使得我们对Pravega加入了Watermark的支持。

我们对Watermark的支持由以下几部分组成:将时间戳关联到Pravega的写入数据上,根据时间戳生成表征位置信息的Stream Cut,以及通过Reader对外暴露时间信息以便允许应用程序生成Watermark。某个Reader得到的时间信息是一个根据各Reader的位置生成的跨Reader时间范围。这个时间范围给出了所有Reader已经读取数据的下界以及在Reader Group上的分布跨度。

本文的方法是一个通用方法,并且支持任意应用程序生成单调递增的时间戳。我们选择Apache Flink作为首个集成对象,因为它对窗口聚合和Watermark具有高级支持。我们在Pravega的Flink连接器上加入了Flink支持,使得使用Pravega的Flink作业可以从Watermark中获益。我们期待未来的Pravega连接器可以提供类似的支持,并且独立的应用程序可以自己实现这种支持,因为本文已经展示了使用该API所需加入的逻辑是非常简单直白的。

References

(1) T. Kaitchuck, “Pravega Watermarking Support,” (Online). Available: http://blog.pravega.io/2019/11/08/pravega-watermarking-support/.

(2) J. Manyika, R. Dobbs, M. Chui, J. Bughin, P. Bisson and J. Woetzel, “The Internet of Things: Mapping the value beyond the hype,” McKinsey Global Institute, McKinsey & Company, 2015.
(3) T. Akidau, R. Bradshaw, C. Chambers, S. Chernyak, R. J. Fernández-Moctezuma, R. Lax, S. McVeety, D. Mills, F. Perry, E. Schmidt and S. Whittle, “The dataflow model: a practical approach to balancing correctness, latency, and cost in massive-scale, unbounded, out-of-order data processing,” in Proceedings of the VLDB Endowment, Kohala Coast, Hawaii, 2015.
(4) “Apache Flink,” (Online). Available: https://flink.apache.org.

(5) “Pravega Connector for Flink,” (Online). Available: https://github.com/pravega/flink-connectors .
(6) “Flink Event Time,” (Online). Available: https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html.

(7) “Generating Timestamps / Watermarks,” (Online). Available: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html

(8) F. Junqueira, “Streams In and out of Pravega,” (Online). Available: http://blog.pravega.io/2018/02/12/streams-in-and-out-of-pravega/ .
(9) “PDP-33 Watermarking,” (Online). Available: https://github.com/pravega/pravega/wiki/PDP-33:-Watermarking.