我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛
Exception in thread "main" org.apache.flink.table.api.TableException:
AppendStreamTableSink doesn't support consuming update and delete changes
which is produced by node TableSourceScan(table=[[hive_catalog, cdc,
team]], fields=[team_id, team_name, create_time, update_time])
我的问题:
1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢?
2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka
-> hive streaming? 谢谢!
3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么?
sql语句如下
CREATE DATABASE IF NOT EXISTS cdc
DROP TABLE IF EXISTS cdc.team
CREATE TABLE team(
team_id BIGINT,
team_name STRING,
create_time TIMESTAMP,
update_time TIMESTAMP,
proctime as proctime()
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'test',
'table-name' = 'team'
)
CREATE DATABASE IF NOT EXISTS ods
DROP TABLE IF EXISTS ods.team
CREATE TABLE ods.team (
team_id BIGINT,
team_name STRING,
create_time TIMESTAMP,
update_time TIMESTAMP,
) PARTITIONED BY (
ts_date STRING,
ts_hour STRING,
ts_minute STRING,
) STORED AS PARQUET TBLPROPERTIES (
'sink.partition-commit.trigger' = 'partition-time',
'sink.partition-commit.delay' = '1 min',
'sink.partition-commit.policy.kind' = 'metastore,success-file',
'partition.time-extractor.timestamp-pattern' = '$ts_date
$ts_hour:$ts_minute:00'
)
INSERT INTO ods.team
SELECT team_id, team_name, create_time, update_time,
my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'),
my_date_format(create_time,'HH', 'Asia/Shanghai'),
my_date_format(create_time,'mm', 'Asia/Shanghai')
FROM cdc.team