跳转至

Flink DDL

Create Catalogs

通过执行下述语句可以创建 flink catalog

CREATE CATALOG <catalog_name> WITH (
  'type'='arctic',
  `<config_key>`=`<config_value>`
); 

其中 <catalog_name> 为用户自定义的 flink catalog 名称, <config_key>=<config_value> 有如下配置:

Key 默认值 类型 是否必填 描述
metastore.url (none) String Arctic Metastore 的 URL,thrift://<ip>:<port>/<catalog_name_in_metastore>
default-database default String 默认使用的数据库
property-version 1 Integer Catalog properties 版本,此选项是为了将来的向后兼容性
properties.auth.load-from-ams True BOOLEAN 是否从 AMS 加载安全校验的配置。 true:从AMS加载;false:不使用AMS的配置。注意:不管该参数是否配置,只要用户配置了下述 auth.*** 相关参数,就会使用该配置进行访问
properties.auth.type (none) String 表安全校验类型,有效值:simple、kerberos 或不配置。默认不配置,无需权限检验。simple:使用 hadoop 用户名,搭配参数'properties.auth.simple.hadoop_username'使用; kerberos:配置 kerberos 权限校验,搭配参数'properties.auth.kerberos.principal','properties.auth.kerberos.keytab','properties.auth.kerberos.krb'使用
properties.auth.simple.hadoop_username (none) String 使用该 hadoop username 访问,'properties.auth.type'='simple'时必填。
properties.auth.kerberos.principal (none) String kerberos 的 principal 配置,'properties.auth.type'='kerberos'时必填。
properties.auth.kerberos.krb.path (none) String kerberos 的 krb5.conf 配置文件的绝对路径(Flink SQL提交机器的本地文件路径,如用 Flink SQL Client提交SQL任务,该路径为同节点的本地路径,如 /XXX/XXX/krb5.conf)。'properties.auth.type'='kerberos'时必填。
properties.auth.kerberos.keytab.path (none) String kerberos 的 XXX.keytab 配置文件的绝对路径(Flink SQL提交机器的本地文件路径,如用 Flink SQL Client提交SQL任务,该路径为同节点的本地路径,如 /XXX/XXX/XXX.keytab)。'properties.auth.type'='kerberos'时必填。

通过 YAML 配置

参考 Flink Sql Client 官方配置。 修改 flink 目录中的 conf/sql-client-defaults.yaml 文件。

catalogs:
- name: <catalog_name>
  type: arctic
  metastore.url: ...
  ...

DDL 语句

CREATE DATABASE

默认使用创建 catalog 时的 default-database 配置(默认值:default)。可使用下述例子创建数据库:

CREATE DATABASE [catalog_name.]arctic_db;

USE arctic_db;

DROP DATABASE

DROP DATABASE catalog_name.arctic_db

CREATE TABLE

CREATE TABLE `arctic_catalog`.`arctic_db`.`test_table` (
    id BIGINT,
    name STRING,
    op_time TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'key' = 'value'
);

目前支持 Flink Sql 建表的大多数语法,包括:

  • PARTITION BY (column1, column2, ...):配置 Flink 分区字段,但 Flink 还未支持隐藏分区
  • PRIMARY KEY (column1, column2, ...):配置主键
  • WITH ('key'='value', ...):配置 Arctic Table 的属性

目前不支持计算列、watermark 字段的配置。

PARTITIONED BY

使用 PARTITIONED BY 创建分区表。

CREATE TABLE `arctic_catalog`.`arctic_db`.`test_table` (
    id BIGINT,
    name STRING,
    op_time TIMESTAMP
) PARTITIONED BY(op_time) WITH (
    'key' = 'value'
);
Arctic 表支持隐藏分区,但 Flink 不支持函数计算的分区,因此目前通过 Flink Sql 只能创建相同值的分区。

CREATE TABLE LIKE

创建一个与已有表相同表结构、分区、表属性的表,可使用 CREATE TABLE LIKE

CREATE TABLE `arctic_catalog`.`arctic_db`.`test_table` (
    id BIGINT,
    name STRING,
    op_time TIMESTAMP
);

CREATE TABLE  `arctic_catalog`.`arctic_db`.`test_table_like` 
    LIKE `arctic_catalog`.`arctic_db`.`test_table`;
更多细节可参考 Flink create table like

DROP TABLE

DROP TABLE `arctic_catalog`.`arctic_db`.`test_table`;

SHOW 语句

SHOW DATABASES

查看当前 catalog 下所有数据库名称:

SHOW DATABASES;

SHOW TABLES

查看当前数据库的所有表名称:

SHOW TABLES;