跳转至

Spark DML

Query

使用 Select 语句查询 Arctic 表

SELECT * FROM arctic_catalog.db.sample

Select from change table

对于主键表,可以通过 .change 查询 ChangeStore 的信息

SELECT * FROM arctic_catalog.db.sample.change

+---+----+----+---------------+------------+--------------+
| id|name|data|_transaction_id|_file_offset|_change_action|
+---+----+----+---------------+------------+--------------+
|  1|dddd|abcd|              3|           1|        INSERT|
|  1|dddd|abcd|              3|           2|        DELETE|
+---+----+----+---------------+------------+--------------+
查出来结果会多三列数据分别是:

  • _transaction_id: 数据写入时AMS分配的 transaction id。批模式下为每条SQL执行时分配,流模式下为每次checkpoint 分配。
  • _file_offset:大小可以表示同一批 _transaction_id 中数据写入的先后顺序。
  • _change_action:表示数据的类型有 INSERT,DELETE 两种

Write

INSERT OVERWRITE

INSERT OVERWRITE可以用查询的结果替换表中的数据

Spark 默认的覆盖模式是 Static

Dynamic 覆盖模式通过设置spark.sql.sources.partitionOverwriteMode=dynamic

为了演示 dynamic overwritestatic overwrite 的行为,由以下DDL定义一张测试表:

CREATE TABLE arctic_catalog.db.sample (
    id int,
    data string,
    ts timestamp,
    primary key (id))
USING arctic
PARTITIONED BY (days(ts))

当 Spark 的覆盖模式是 Dynamic 时,由 SELECT 查询生成的行的分区将被替换。

INSERT OVERWRITE arctic_catalog.db.sample values 
(1, 'aaa',  timestamp(' 2022-1-1 09:00:00 ')), 
(2, 'bbb',  timestamp(' 2022-1-2 09:00:00 ')), 
(3, 'ccc',  timestamp(' 2022-1-3 09:00:00 '))

当 Spark 的覆盖模式为 Static 时,该 PARTITION 子句将转换为从表中 SELECT 的结果集。如果 PARTITION 省略该子句,则将替换所有分区

INSERT OVERWRITE arctic_catalog.db.sample 
partition( dt = '2021-1-1')  values 
(1, 'aaa'), (2, 'bbb'), (3, 'ccc') 

在 Static 模式下,不支持在分区字段上定义 transform

可以通过 SPARK SQLset spark.sql.arctic.check-source-data-uniqueness.enabled = true 开启对源表主键的唯一性校验,若存在相同主键,写入时会报错提示。

INSERT INTO

无主键表

要向无主键表添加新数据,请使用 INSERT INTO

INSERT INTO arctic_catalog.db.sample VALUES (1, 'a'), (2, 'b')

INSERT INTO prod.db.table SELECT ...

有主键表

向有主键表添加新数据,可以根据配置 write.upsert.enabled 参数,来控制是否开启 UPSERT 功能。

UPSERT 开启后,主键相同的行存在时执行 UPDATE 操作,不存在时执行 INSERT 操作

UPSERT 关闭后,仅执行 INSERT 操作

CREATE TABLE arctic_catalog.db.keyedTable (
    id int,
    data string,
    primary key (id))
USING arctic
TBLPROPERTIES ('write.upsert.enabled' = 'true')
INSERT INTO arctic_catalog.db.keyedTable VALUES (1, 'a'), (2, 'b')

INSERT INTO prod.db.keyedTable SELECT ...

可以通过 SPARK SQLset spark.sql.arctic.check-source-data-uniqueness.enabled = true 开启对源表主键的唯一性校验,若存在相同主键,写入时会报错提示。

DELETE FROM

Arctic Spark 支持 DELETE FROM 语法用于删除表中数据

DELETE FROM arctic_catalog.db.sample
WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'

DELETE FROM arctic_catalog.db.sample
WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events)

DELETE FROM arctic_catalog.db.sample AS t1
WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)

UPDATE

支持 UPDATE 语句对表进行更新

更新语句使用 SELECT 来匹配要更新的行

UPDATE arctic_catalog.db.sample
SET c1 = 'update_c1', c2 = 'update_c2'
WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'

UPDATE arctic_catalog.db.sample
SET session_time = 0, ignored = true
WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events)

UPDATE arctic_catalog.db.sample AS t1
SET order_status = 'returned'
WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)

MERGE INTO

MERGE INTO prod.db.target t   -- a target table
USING (SELECT ...) s          -- the source updates
ON t.id = s.id                -- condition to find updates for target rows
WHEN ...                      -- updates

支持多个 WHEN MATCHED ... THEN ... 语法执行 UPDATE, DELETE, INSERT 等操作。

MERGE INTO prod.db.target t   
USING prod.db.source s       
ON t.id = s.id             
WHEN MATCHED AND s.op = 'delete' THEN DELETE
WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1          
WHEN NOT MATCHED THEN INSERT *