跳转至

Flink DIM

维表 Join

简述

Flink SQL 官方支持多种 Join 方式,见 官方文档 对于 Flink 官方 Regular Join、Interval Join、Temporal Join,用户可以按照上述 Flink 官方的文档正常使用。 对于将 Arctic 表作为维表的 Lookup Join 场景,请按本文档的描述配置使用。

使用说明

注意:维表必须定义主键,并且 Join 条件必须包含所有主键字段。

以下是 Arctic 作为维表 Join 的使用方法:

-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;

-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled=true;

-- 创建一张 Arctic 表
CREATE TABLE IF NOT EXISTS arctic_catalog.default_db.`user` (
    id   INT,
    name STRING,
    age  INT,
    PRIMARY KEY (id) NOT ENFORCED
);

-- 创建一张主表,需要将 proctime 作为 watermark。如下所示:
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    user_id     INT,
    order_time  AS LOCALTIMESTAMP,
    WATERMARK FOR order_time AS order_time
) WITH (/* ... */);

-- 给 Arctic 表添加 watermark。"opt" 可以是任意名称,但不能与 arctic_catalog.default_db.`user` 已有字段同名
CREATE TABLE user_dim (
    opt TIMESTAMP(3),
    WATERMARK FOR opt AS opt
) LIKE arctic_catalog.default_db.`user`;

-- 开启 Arctic 表流读和维表的配置
SELECT order_id, price, user_id, name, age 
FROM orders
LEFT JOIN user_dim /*+OPTIONS('streaming'='true', 'dim-table.enabled'='true')*/
    FOR SYSTEM_TIME AS OF orders.order_time
ON orders.user_id = user_dim.id
Key 默认值 类型 是否必填 描述
dim-table.enabled false Boolean 是否将 Arctic 表作为维表使用,默认false。作为维表时需要设置为 true
  • 当 Arctic 作为维表(user 表)数据量很大时,需要一定的存量数据加载时间。在此期间,左表(orders)的数据会缓存在 Join 算子中,直到维表存量数据加载完, 才会触发 Join 算子的关联操作并向下游输出数据。
  • 现阶段维表读 Arctic Filestore 会存在一定的时间延迟,如果左表(orders)的数据是毫秒级延迟的实时数据,需要指定允许一定时间的延迟,让左表数据缓存一段时间后,再触发 Join。 如允许 10s 的延迟:WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND,避免左表(orders)的数据比维表数据快,导致 Join 关联不上维表侧(user 表)的数据。
  • 未来会基于 Arctic Logstore 实现更实时的维表

注意事项

  • 如果主表有指定 watermark 的需求,如需要维表 Join 后作窗口计算,则可以自由指定:
    CREATE TABLE orders (
      order_id    STRING,
      price       DECIMAL(32,2),
      user_id     INT,
      order_time  TIMESTAMP(3),
      WATERMARK FOR order_time AS order_time
      ) WITH (/* ... */);
    

性能测试

本次性能测试主要面向维表有较大存量数据,并且主表持续流入数据的场景。有相关场景需求的用户可以将本次性能测试的结果作为一个参考,具体的测试条件和结果如下:

测试条件

  • 主表:Flink DataGen 数据源,设置数据生成速率为10000条/秒
  • 维表:初始数据量为10G大小,共1580万条记录数的 Arctic 表,并且有新的数据不断流入,流入速率为100条/秒
  • Flink 版本:1.14.5
  • Flink 任务配置
    • 并发度:8
    • TaskManager 数量:8
    • TaskManager 内存:2G
    • numberOfTaskSlots:1
    • JobManager 内存:2G
    • state backend: RocksDB
    • state.backend.incremental:true
    • Checkpoint 间隔:1min
    • Checkpoint 超时时间:30min。需要说明的是,Flink 任务首次初始化维表数据到 Rocksdb 容易造成 Checkpoint 超时,因此本次测试将 Checkpoint 超时时间设置为30min

测试指标

  1. 维表存量数据的加载时间
  2. 维表存量数据加载完后第一次 Join 计算花费的时间
  3. Checkpoint 大小
  4. Failover 耗时

测试结果

测试指标 测试数值
维表存量数据的加载时间 8min
第一次 Join 计算花费的时间 3min
Checkpoint 大小 1.86GB
Failover 耗时 5s