Categories
程式開發

hive拉链表优化·百亿量级数据支持准实时更新


优化源于痛点(┬_┬)

有没有痛点取决于业务场景的需求;有多痛取决于当前方案对业务的契合度

让我们从业务场景①、当前方案②切入,联立①②来推导当前痛点③吧!

话不多说,开始分析

①业务场景:

1.表的数据量很大,时间长了可能会到百亿级的数据

2.表中的部分数据需要更新

3.需要查看历史变更记录

4.更新数量很低,但更新频率可能比较高

②当前方案:

采用了hive的拉链表,讲这个的博客比较多,我只讲一讲操作。我们现在是每天指定时间执行一次拉链表的操作,更改全部走kafka,从接口读到更新后存入kafka等待明日执行更新,使用的时候从代码中根据最新时间和状态字段对数据进行过滤

③当前缺陷:

1.无法实现准实时。看似能实现准实时,但其实只能以日更新的方式跑批。因为在百亿这个数据量级下,如果使用time来分辨每条数据的最终时间,数据分析的时间成本会过于高昂,只能使用类似于20200811这样的dt(datatime)作为分区字段来寻求较快的对比,因此,只能以每天执行一次任务的形式跑批(如果数据量较小,可以实现实时,但是数据量较小的话也没必要用hive,所以…)

2.每次数据分析的时候都要对数据状态进行过滤,非常浪费算力,还会严重拉慢分析速度,分析job多的话会很头疼

3.每条数据分析都要加相关筛选,以前的sql改起来直接炸裂

ps:另一种方案是每天都从这张表里拉出一张新的干净表,这样可以规避2,3,但是会新增一个问题,就是每天都需要浪费内存和算力以及时间去维护一张新表,且依然无法实现准实时

那么问题来了,是否存在什么更合适的方案来解决这些痛点呢?

我想,是有的。

根据痛点③,反推我们的预期目标④;

根据目标④,尝试推导出优化思路⑤;

落地思路⑤,成为最终的优化方案⑥

④预期目标:

1.实现准实时,能够在接口更新数据结束后半小时内完成数据的更新

2.实现物理删除,让分析任务纵享丝滑

3.更改尽可能小,最好不要对上下层的操作有任何影响

⑤优化思路:

1.从痛点来看,不能实现准实时的主要原因是更新数据和干净数据混在一张表里,筛选步骤严重影响了计算效率。因此,我们首先要将更新数据和干净的预期数据分离成两张表

2.如果不希望影响前后过程,那么我们期望实现物理删除。全量的数据覆盖是我们无法接受的,但如果数据按照分区分区较为均匀,每个分区的数据量我们都可以接受,对若干个分区的数据做覆盖,HDFS表示:小菜一碟。因此,我们需要找出一个能让数据分布较为均匀的分区字段,一般情况下,就是dt。接着,按照分区进行覆盖,就能实现物理删除啦!

⑥优化方案:

我们先把百亿级的数据分为两种情况

1.由时间累计产生的海量数据,数据分布较为均匀

2.一次性导入大量数据,数据分布不均匀

第2种情况可以通过以hash取模的结果来强制均衡数据,或者hive分桶的思路或许也可以(这块我还不太熟练,暂且一提)。第1种情况是我们的理想数据,可以直接按照dt来分区。下面,我按照第1种情况来分析,用同样的方法略微更换分区方法就可以解决第2种情况。

test:原表,存储海量数据,数据按照分区字段dt相对均匀的分布

zipper:增量表,保存所有来自接口的更新信息,字段对比原表增加一个statu,0代表新增,1代表删除。如果没有modify_time,增加这个字段,如果有,直接用

①创建临时表delete,start_time为kafka开始接入数据的时间,modify_time为写入增量表的时间,开始执行时间为kafka停止接入数据后的n分钟(根据实际需求自行设置),以防数据频繁接入触发程序一直执行

with delete as (
select id from zipper where modify_time>=start_time and statu=1
)

②从delete表获取原表对应的dt分区,hive本身不支持直接in,但是可以用left semi join来代替,这个写法的性能优于直接join

delete_dt as (
select dt from
test left semi join delete
on test.id=delete.id
)

③从dt分区搜索出所有不需要删除的数据,覆盖写入原表对应分区,实现物理删除。hive本身不支持直接not in,但是可以用where not exists来替代该逻辑。此处如果有人有更高效的方法,还请告知,我想进一步优化但是没能成功

dt_data as (
select * from test left semi join delete_dt
on test.dt=delete_dt.dt
)
insert overwrite test
select 原表字段们 from dt_data
where not exists (select id from delete where delete.id = dt_data.id)

④上面已经处理完删除数据,接下来从zipper获取新增数据,并插入原表对应分区

insert into test
select 原表字段们
from zipper where time>=start_time and statu=0;

最终代码

with delete as (
select id from zipper where statu=1
),
delete_dt as (
select dt from
test left semi join delete
on test.id=delete.id
),
dt_data as (
select * from test left semi join delete_dt
on test.dt=delete_dt.dt
)
select * from dt_data
where not exists (select id from delete where delete.id = dt_data.id);

以上就是我的优化方案,所有sql均在spark.sql中执行,优点如下:

1.分析任务不受任何影响

2.每次执行覆盖的数据少,执行快,可以结合kafka实现准实时更新;

3.需要更新的数据量不大,存储所有变更记录的zipper表不会有太多数据,查询变更记录方便,原表也一直是干净的,不需要做后续维护;

4.具备强一致性,通过java统一调度,规避了凌晨跑完该任务后,白天遇到更新要求,ES、Arango实时更新后,执行分析任务时再次写入ES、Arango,导致数据不一致的情况

5.全程最多为原表新增一个分区字段,完全不影响业务上下游

以上就是本次优化从思考到实现的全过程啦,希望大家喜欢(≧▽≦)