Categories
程式開發

日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践


引言

当今生活节奏日益加快,企业面对不断增加的海量信息,其信息筛选和处理效率低下的困扰与日俱增。由于用户营销不够细化,企业App中许多不合时宜或不合偏好的消息推送很大程度上影响了用户体验,甚至引发了用户流失。在此背景下,友信金服公司推行全域的数据体系战略,通过打通和整合集团各个业务线数据,利用大数据、人工智能等技术构建统一的数据资产,如ID-Mapping、用户标签等。友信金服用户画像项目正是以此为背景成立,旨在实现“数据驱动业务与运营”的集团战略。目前该系统支持日处理数据量超10亿,接入上百种合规数据源。

一、技术选型

传统基于Hadoop生态的离线数据存储计算方案已在业界大规模应用,但受制于离线计算的高时延性,越来越多的数据应用场景已从离线转为实时。这里引用一张表格对目前主流的实时计算框架做个对比。

日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践 1

Apache Storm的容错机制需要对每条数据进行应答(ACK),因此其吞吐量备受影响,在数据大吞吐量的场景下会有问题,因此不适用此项目的需求。

Apache Spark总体生态更为完善,且在机器学习的集成和应用性暂时领先,但Spark底层还是采用微批(Micro Batching)处理的形式。

Apache Flink在流式计算上有明显优势:首先其流式计算属于真正意义上的单条处理,即每一条数据都会触发计算。在这一点上明显与Spark的微批流式处理方式不同。其次,Flink的容错机制较为轻量,对吞吐量影响较小,使得Flink可以达到很高的吞吐量。最后Flink还拥有易用性高,部署简单等优势。相比之下我们最终决定采用基于Flink的架构方案。

二、用户画像业务架构

用户画像系统目前为集团线上业务提供用户实时标签数据服务。为此我们的服务需要打通多种数据源,对海量的数字信息进行实时不间断的数据清洗、聚类、分析,从而将它们抽象成标签,并最终为应用方提供高质量的标签服务。在此背景下,我们设计用户画像系统的整体架构如下图所示:

日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践 2

整体架构分为五层:

  1. 接入层:接入原始数据并对其进行处理,如Kafka、Hive、文件等。
  2. 计算层:选用Flink作为实时计算框架,对实时数据进行清洗,关联等操作。
  3. 存储层:对清洗完成的数据进行数据存储,我们对此进行了实时用户画像的模型分层与构建,将不同应用场景的数据分别存储在如Phoenix,HBase,HDFS,Kafka等。
  4. 服务层:对外提供统一的数据查询服务,支持从底层明细数据到聚合层数据的多维计算服务。
  5. 应用层:以统一查询服务对各个业务线数据场景进行支撑。目前业务主要包含用户兴趣分、用户质量分、用户的事实信息等数据。

三、用户画像数据处理流程

在整体架构设计方案设计完成之后,我们针对数据也设计了详尽的处理方案。在数据处理阶段,鉴于Kafka高吞吐量、高稳定性的特点,我们的用户画像系统统一采用Kafka作为分布式发布订阅消息系统。数据清洗阶段利用Flink来实现用户唯一性识别、行为数据的清洗等,去除冗余数据。这一过程支持交互计算和多种复杂算法,并支持数据实时/离线计算。目前我们数据处理流程迭代了两版,具体方案如下:

1.0版数据处理流程

数据接入、计算、存储三层处理流程

整体数据来源包含两种:

  1. 历史数据:从外部数据源接入的海量历史业务数据。接入后经过ETL处理,进入用户画像底层数据表。
  2. 实时数据:从外部数据源接入的实时业务数据,如用户行为埋点数据,风控数据等。

根据不同业务的指标需求我们直接从集团数据仓库抽取数据并落入Kafka,或者直接从业务端以CDC(Capture Data Change)的方式写入Kafka。在计算层,数据被导入到Flink中,通过DataStream生成ID-Mapping、用户标签碎片等数据,然后将生成数据存入JanusGraph(JanusGraph是以HBase作为后端存储的图数据库介质)与Kafka,并由Flink消费落入Kafka的用户标签碎片数据,进行聚合生成最新的用户标签碎片(用户标签碎片是由用户画像系统获取来自多种渠道的碎片化数据块处理后生成的)。

日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践 3

数据服务层处理流程

服务层将存储层存储的用户标签碎片数据,通过JanusGraph Spark On Yarn模式,执行TinkerPop OLAP计算生成全量用户Yids列表文件。Yid是用户画像系统中定义的集团级用户ID标识。结合Yids列表文件,在Flink中批量读取HBase聚合成完整用户画像数据,生成HDFS文件,再通过Flink批量操作新生成的数据生成用户评分预测标签,将用户评分预测标签落入Phoenix,之后数据便可通过统一数据服务接口进行获取。下图完整地展示了这一流程。

日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践 4

ID-Mapping数据结构

为了实现用户标签的整合,用户ID之间的强打通,我们将用户ID标识看成图的顶点、ID pair关系看作图的边,比如已经识别浏览器Cookie的用户使用手机号登陆了公司网站就形成了对应关系。这样所有用户ID标识就构成了一张大图,其中每个小的连通子图/连通分支就是一个用户的全部标识ID信息。

ID-Mapping数据由图结构模型构建,图节点包含UserKey、Device、IdCard、Phone等类型,分别表示用户的业务ID、设备ID、身份证以及电话等信息。节点之间边的生成规则是通过解析数据流中包含的节点信息,以一定的优先级顺序进行节点之间的连接,从而生成节点之间的边。比如,识别了用户手机系统的Android_ID,之后用户使用邮箱登陆了公司App,在系统中找到了业务线UID就形成了和关系的ID pair,然后系统根据节点类型进行优先级排序,生成Android_ID、mail、UID的关系图。数据图结构模型如下图所示:

日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践 5

Gephi

1.0版本数据处理流程性能瓶颈

1.0版本数据处理流程在系统初期较好地满足了我们的日常需求,但随着数据量的增长,该方案遇到了一些性能瓶颈:

  1. 首先,这版的数据处理使用了自研的Java程序来实现。随着数据量上涨,自研JAVA程序由于数据量暴增导致JVM内存大小不可控,同时它的维护成本很高,因此我们决定在新版本中将处理逻辑全部迁移至Flink中。
  2. 其次,在生成用户标签过程中,ID-Mapping出现很多大的连通子图(如下图所示)。这通常是因为用户的行为数据比较随机离散,导致部分节点间连接混乱。这不仅增加了数据的维护难度,也导致部分数据被“污染”。另外这类异常大的子图会严重降低JanusGraph与HBase的查询性能。

日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践 6

Gephi

  1. 最后,该版方案中数据经Protocol Buffer(PB)序列化之后存入HBase,这会导致合并/更新用户画像标签碎片的次数过多,使得一个标签需要读取多次JanusGraph与HBase,这无疑会加重HBase读取压力。此外,由于数据经过了PB序列化,使得其原始存储格式不可读,增加了排查问题的难度。

鉴于这些问题,我们提出了2.0版本的解决方案。在2.0版本中,我们通过利用HBase列式存储、修改图数据结构等优化方案尝试解决以上三个问题。

2.0版数据处理流程

版本流程优化点

如下图所示,2.0版本数据处理流程大部分承袭了1.0版本。新版本数据处理流程在以下几个方面做了优化:

日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践 7

2.0版本数据处理流程

  1. 历史数据的离线补录方式由JAVA服务变更为使用Flink实现。
  2. 优化用户画像图数据结构模型,主要是对边的连接方式进行了修改。之前我们会判断节点的类型并根据预设的优先级顺序将多个节点进行连接,新方案则采用以UserKey为中心的连接方式。做此修改后,之前的大的连通子图(图6)优化为下面的小的连通子图(图8),同时解决了数据污染问题,保证了数据准确性。另外,1.0版本中一条数据需要平均读取十多次HBase的情况也得到极大缓解。采用新方案之后平均一条数据只需读取三次HBase,从而降低HBase六七倍的读取压力(此处优化是数据计算层优化)。

日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践 8

Gephi

  1. 旧版本是用Protocol Buffer作为用户画像数据的存储对象,生成用户画像数据后作为一个列整体存入HBase。新版本使用Map存储用户画像标签数据,Map的每对KV都是单独的标签,KV在存入HBase后也是单独的列。新版本存储模式利用HBase做列的扩展与合并,直接生成完整用户画像数据,去掉Flink合并/更新用户画像标签过程,优化数据加工流程。使用此方案后,存入HBase的标签数据具备了即席查询功能。数据具备即席查询是指在HBase中可用特定条件直接查看指定标签数据详情的功能,它是数据治理可以实现校验数据质量、数据生命周期、数据安全等功能的基础条件。
  2. 在数据服务层,我们利用Flink批量读取HBase的Hive外部表生成用户质量分等数据,之后将其存入Phoenix。相比于旧方案中Spark全量读HBase导致其读压力过大,从而会出现集群节点宕机的问题,新方案能够有效地降低HBase的读取压力。经过我们线上验证,新方案对HBase的读负载下降了数十倍(此处优化与2优化不同,属于服务层优化)。

四、问题

目前,线上部署的用户画像系统中的数据绝大部分是来自于Kafka的实时数据。随着数据量越来越多,系统的压力也越来越大,以至于出现了Flink背压与Checkpoint超时等问题,导致Flink提交Kafka位移失败,从而影响了数据一致性。这些线上出现的问题让我们开始关注Flink的可靠性、稳定性以及性能。针对这些问题,我们进行了详细的分析并结合自身的业务特点,探索并实践出了一些相应的解决方案。

CheckPointing 流程分析与性能优化方案

CheckPointing流程分析:

下图展示了Flink中checkpointing执行流程图:

日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践 9

Flink中checkpointing执行流程

  1. Coordinator向所有Source节点发出Barrier。
  2. Task从输入中收到所有Barrier后,将自己的状态写入持久化存储中,并向自己的下游继续传递Barrier。
  3. 当Task完成状态持久化之后将存储后的状态地址通知到Coordinator。
  4. 当Coordinator汇总所有Task的状态,并将这些数据的存放路径写入持久化存储中,完成CheckPointing。

性能优化方案:

通过以上流程分析,我们通过三种方式来提高Checkpointing性能。这些方案分别是:

  1. 选择合适的Checkpoint存储方式
  2. 合理增加算子(Task)并行度
  3. 缩短算子链(Operator Chains)长度

选择合适的Checkpoint存储方式

CheckPoint存储方式有MemoryStateBackend、FsStateBackend和RocksDBStateBackend。由官方文档可知,不同StateBackend之间的性能以及安全性是有很大差异的。通常情况下,MemoryStateBackend适合应用于测试环境,线上环境则最好选择RocksDBStateBackend。

这有两个原因:首先,RocksDBStateBackend是外部存储,其他两种Checkpoint存储方式都是JVM堆存储。受限于JVM堆内存的大小,Checkpoint状态大小以及安全性可能会受到一定的制约;其次,RocksDBStateBackend支持增量检查点。增量检查点机制(Incremental Checkpoints)仅仅记录对先前完成的检查点的更改,而不是生成完整的状态。与完整检查点相比,增量检查点可以显著缩短checkpointing时间,但代价是需要更长的恢复时间。

合理增加算子(Task)并行度

Checkpointing需要对每个Task进行数据状态采集。单个Task状态数据越多则Checkpointing越慢。所以我们可以通过增加Task并行度,减少单个Task状态数据的数量来达到缩短CheckPointing时间的效果。

缩短算子链(Operator Chains)长度

Flink算子链(Operator Chains)越长,Task也会越多,相应的状态数据也就更多,Checkpointing也会越慢。通过缩短算子链长度,可以减少Task数量,从而减少系统中的状态数据总量,间接的达到优化Checkpointing的目的。下面展示了Flink算子链的合并规则:

  1. 上下游的并行度一致
  2. 下游节点的入度为1
  3. 上下游节点都在同一个Slot Group中
  4. 下游节点的Chain策略为ALWAYS
  5. 上游节点的Chain策略为ALWAYS或HEAD
  6. 两个节点间数据分区方式是Forward
  7. 用户没有禁用Chain

基于以上这些规则,我们在代码层面上合并了相关度较大的一些Task,使得平均的操作算子链长度至少缩短了60%~70%。

Flink背压产生过程分析及解决方案

背压产生过程分析:

在Flink运行过程中,每一个操作算子都会消费一个中间/过渡状态的流,并对它们进行转换,然后生产一个新的流。这种机制可以类比为:Flink使用阻塞队列作为有界的缓冲区。跟Java里阻塞队列一样,一旦队列达到容量上限,处理速度较慢的消费者会阻塞生产者向队列发送新的消息或事件。下图展示了Flink中两个操作算子之间的数据传输以及如何感知到背压的:

日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践 10

首先,Source中的事件进入 Flink并被操作算子1处理且被序列化到Buffer中,然后操作算子2从这个Buffer中读出该事件。当操作算子2处理能力不足的时候,操作算子1中的数据便无法放入Buffer,从而形成背压。背压出现的原因可能有以下两点:1、下游算子处理能力不足;2、数据发生了倾斜。

背压解决方案

实践中我们通过以下方式解决背压问题。首先,缩短算子链会合理的合并算子,节省出资源。其次缩短算子链也会减少Task(线程)之间的切换、消息的序列化/反序列化以及数据在缓冲区的交换次数,进而提高系统的整体吞吐量。最后,根据数据特性将不需要或者暂不需要的数据进行过滤,然后根据业务需求将数据分别处理,比如有些数据源需要实时的处理,有些数据是可以延迟的,最后通过使用keyBy关键字,控制Flink时间窗口大小,在上游算子处理逻辑中尽量合并更多数据来达到降低下游算子的处理压力。

优化结果

经过以上优化,在每天亿级数据量下,用户画像可以做到实时信息实时处理并无持续背压,Checkpointing平均时长稳定在1秒以内。

五、未来工作的思考和展望

端到端的实时流处理

目前用户画像部分数据都是从Hive数据仓库拿到的,数据仓库本身是T+1模式,数据延时性较大,所以为了提高数据实时性,端到端的实时流处理很有必要。

端到端是指一端采集原始数据,另一端以报表/标签/接口的方式对这些对数进行呈现与应用,连接两端的是中间实时流。在后续的工作中,我们计划将现有的非实时数据源全部切换到实时数据源,统一经过Kafka和Flink处理后再导入到Phoenix/JanusGraph/HBase。强制所有数据源数据进入Kafka的一个好处在于它能够提高整体流程的稳定性与可用性:首先Kafka作为下游系统的缓冲,可以避免下游系统的异常影响实时流的计算,起到“削峰填谷”的作用;其次,Flink自1.4版本开始正式支持与Kafka的端到端精确一次处理语义,在一致性方面上更有保证。

日处理数据量超10亿:友信金服基于Flink构建实时用户画像系统的实践 11

作者介绍:

杨毅:友信金服计算平台部JAVA工程师

穆超峰:友信金服计算平台部数据开发高级工程师

贺小兵:友信金服计算平台部数据开发工程师

胡夕:友信金服计算平台部技术总监