跳转至

Getting Started

概要

Apache Flink 引擎可以在批流模式处理 Arctic 表数据。Flink on Arctic connector 提供读写 Arctic 数据湖的能力且满足数据一致性保证,也提供实时维表关联 Arctic 数据湖的能力。为了满足对数据实时性要求很高的业务,Arctic 数据湖底层存储结构设计了 Logstore,其存放着最新的 changelog 或 append-only 实时数据。

Arctic 集成了 Apache Flink 的 DataStream API 与 Table API,以方便的使用 Flink 从 Arctic 表中读取数据,或将数据写入 Arctic 表中。

Arctic Flink 目录下的文档均只针对 Mixed-Format 生效。如果您使用的是 Iceberg format 表,请参考 Iceberg 官方的用法 Iceberg Flink 用户手册

Flink Connector 包括:

  • Flink SQL Select 通过 Apache Flink SQL 读取 Arctic 表数据。
  • Flink SQL Insert 通过 Apache Flink SQL 将数据写入到 Arctic 表。
  • Flink SQL DDL 通过 Apache Flink DDL 语句创建/修改/删除库和表。
  • FlinkSource 通过 Apache Flink DS API 读取 Arctic 表数据。
  • FlinkSink 通过 Apache Flink DS API 将数据写入到 Arctic 表。
  • Flink Lookup Join 通过 Apache Flink Temporal Join 语法实时读取 Arctic 表数据进行关联计算。

版本说明:

Connector Version Flink Version Dependent Iceberg Version 下载
0.4.0 1.12.x 0.13.2 flink-1.12-0.4.0
0.4.0 1.14.x 0.13.2 flink-1.14-0.4.0
0.4.0 1.15.x 0.13.2 flink-1.15-0.4.0

Kafka 作为 Logstore 版本说明:

Connector Version Flink Version Kafka Versions
0.4.0 1.12.x 0.10.2.*
0.11.*
1.*
2.*
3.*
0.4.0 1.14.x 0.10.2.*
0.11.*
1.*
2.*
3.*
0.4.0 1.15.x 0.10.2.*
0.11.*
1.*
2.*
3.*

对 Arctic 工程自行编译也可以获取该 runtime jar

mvn clean package -pl ':arctic-flink-runtime-1.14' -am -DskipTests

Flink Runtime Jar 存放在 flink/v1.14/flink-runtime/target 目录。

环境准备

下载flink和相关依赖,按需下载 Flink 1.12/1.14/1.15。以 1.12 为例:

FLINK_VERSION=1.12.7
SCALA_VERSION=2.12
APACHE_FLINK_URL=archive.apache.org/dist/flink
HADOOP_VERSION=2.7.5

## 下载 Flink 1.12.x 包,目前 Arctic-flink-runtime jar 包使用 scala 2.12
wget ${APACHE_FLINK_URL}/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz
## 解压文件
tar -zxvf flink-1.12.7-bin-scala_2.12.tgz

# 下载 hadoop 依赖
wget https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/${HADOOP_VERSION}-10.0/flink-shaded-hadoop-2-uber-${HADOOP_VERSION}-10.0.jar
# 下载 arctic flink connector
wget https://github.com/NetEase/arctic/releases/download/v0.4.0-rc2/arctic-flink-runtime-1.12-0.4.0.jar

修改 Flink 相关配置文件:

cd flink-1.12.7
vim conf/flink-conf.yaml
修改下面的配置:

# 需要同时运行两个流任务,增加 slot
taskmanager.numberOfTaskSlots: 4
# 开启 Checkpoint。只有开启 Checkpoint,写入 file 的数据才可见
execution.checkpointing.interval: 10s

将依赖移到 Flink 的 lib 目录中:

# 用于创建 socket connector,以便通过 socket 输入 CDC 数据。非 quickstart 案例流程可以不添加
cp examples/table/ChangelogSocketExample.jar lib

cp ../arctic-flink-runtime-1.12-0.3.0.jar lib
cp ../flink-shaded-hadoop-2-uber-${HADOOP_VERSION}-10.0.jar lib

Hive兼容

Arctic 0.3.1 版本开始支持 Hive 兼容的功能,可以通过 Flink 读取/写入 Arctic Hive 兼容表数据。当通过 Flink 操作 Hive 兼容表时,需要注意以下几点:

  1. Flink Runtime Jar 不包括 Hive 依赖的 Jar 包内容,需要手动将 Hive 依赖的 Jar 包放到 flink/lib 目录下;
  2. 创建分区表时,分区字段需要放在最后一列;当分区字段为多个字段时,需要全部放在最后;
  3. 对于 Hive 兼容表,建表方式读写方式与非 Hive 兼容的 Arctic 表一致;
  4. 支持 Hive 版本为 2.1.1

常见问题

写 Arctic 表 File 数据不可见

需要打开 Flink checkpoint,修改 Flink conf 中的 Flink checkpoint 配置,数据只有在 checkpoint 时才会提交。

通过 Flink SQL-Client 读取开启了 write.upsert 特性的 Arctic 表时,仍存在重复主键的数据

通过 Flink SQL-Client 得到的查询结果不能提供基于主键的 MOR 语义,如果需要通过 Flink 引擎查询得到合并后的结果,可以将 Arctic 表的内容通过 JDBC connector 写入到 MySQL 表中进行查看

Flink 1.15 版本下通过 SQL-Client 写入开启了 write.upsert 特性的 Arctic 表时,仍存在重复主键的数据

需要在 SQL-Client 中执行命令 set table.exec.sink.upsert-materialize = none,以关闭 upsert materialize 算子生成的 upsert 视图。该算子会影响 ArcticWriter 在 write.upsert 特性开启时生成 delete 数据,导致重复主键的数据无法合并