Flink DIM
维表 Join
简述
Flink SQL 官方支持多种 Join 方式,见 官方文档 对于 Flink 官方 Regular Join、Interval Join、Temporal Join,用户可以按照上述 Flink 官方的文档正常使用。 对于将 Arctic 表作为维表的 Lookup Join 场景,请按本文档的描述配置使用。
使用说明
注意:维表必须定义主键,并且 Join 条件必须包含所有主键字段。
以下是 Arctic 作为维表 Join 的使用方法:
-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;
-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled=true;
-- 创建一张 Arctic 表
CREATE TABLE IF NOT EXISTS arctic_catalog.default_db.`user` (
id INT,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
);
-- 创建一张主表,需要将 proctime 作为 watermark。如下所示:
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
user_id INT,
order_time AS LOCALTIMESTAMP,
WATERMARK FOR order_time AS order_time
) WITH (/* ... */);
-- 给 Arctic 表添加 watermark。"opt" 可以是任意名称,但不能与 arctic_catalog.default_db.`user` 已有字段同名
CREATE TABLE user_dim (
opt TIMESTAMP(3),
WATERMARK FOR opt AS opt
) LIKE arctic_catalog.default_db.`user`;
-- 开启 Arctic 表流读和维表的配置
SELECT order_id, price, user_id, name, age
FROM orders
LEFT JOIN user_dim /*+OPTIONS('streaming'='true', 'dim-table.enabled'='true')*/
FOR SYSTEM_TIME AS OF orders.order_time
ON orders.user_id = user_dim.id
Key | 默认值 | 类型 | 是否必填 | 描述 |
---|---|---|---|---|
dim-table.enabled | false | Boolean | 否 | 是否将 Arctic 表作为维表使用,默认false。作为维表时需要设置为 true |
- 当 Arctic 作为维表(user 表)数据量很大时,需要一定的存量数据加载时间。在此期间,左表(orders)的数据会缓存在 Join 算子中,直到维表存量数据加载完, 才会触发 Join 算子的关联操作并向下游输出数据。
- 现阶段维表读 Arctic Filestore 会存在一定的时间延迟,如果左表(orders)的数据是毫秒级延迟的实时数据,需要指定允许一定时间的延迟,让左表数据缓存一段时间后,再触发 Join。 如允许 10s 的延迟:WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND,避免左表(orders)的数据比维表数据快,导致 Join 关联不上维表侧(user 表)的数据。
- 未来会基于 Arctic Logstore 实现更实时的维表
注意事项
- 如果主表有指定 watermark 的需求,如需要维表 Join 后作窗口计算,则可以自由指定:
CREATE TABLE orders ( order_id STRING, price DECIMAL(32,2), user_id INT, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH (/* ... */);
性能测试
本次性能测试主要面向维表有较大存量数据,并且主表持续流入数据的场景。有相关场景需求的用户可以将本次性能测试的结果作为一个参考,具体的测试条件和结果如下:
测试条件
- 主表:Flink DataGen 数据源,设置数据生成速率为10000条/秒
- 维表:初始数据量为10G大小,共1580万条记录数的 Arctic 表,并且有新的数据不断流入,流入速率为100条/秒
- Flink 版本:1.14.5
- Flink 任务配置
- 并发度:8
- TaskManager 数量:8
- TaskManager 内存:2G
- numberOfTaskSlots:1
- JobManager 内存:2G
- state backend: RocksDB
- state.backend.incremental:true
- Checkpoint 间隔:1min
- Checkpoint 超时时间:30min。需要说明的是,Flink 任务首次初始化维表数据到 Rocksdb 容易造成 Checkpoint 超时,因此本次测试将 Checkpoint 超时时间设置为30min
测试指标
- 维表存量数据的加载时间
- 维表存量数据加载完后第一次 Join 计算花费的时间
- Checkpoint 大小
- Failover 耗时
测试结果
测试指标 | 测试数值 |
---|---|
维表存量数据的加载时间 | 8min |
第一次 Join 计算花费的时间 | 3min |
Checkpoint 大小 | 1.86GB |
Failover 耗时 | 5s |