hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。


| |
罗显宴
|
|
邮箱:[email protected]
|

签名由 网易邮箱大师 定制

在2020年10月31日 12:06,陈帅 写道:
我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛

Exception in thread "main" org.apache.flink.table.api.TableException:
AppendStreamTableSink doesn't support consuming update and delete changes
which is produced by node TableSourceScan(table=[[hive_catalog, cdc,
team]], fields=[team_id, team_name, create_time, update_time])

我的问题:
1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢?
2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka
-> hive streaming? 谢谢!
3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么?

sql语句如下

CREATE DATABASE IF NOT EXISTS cdc

DROP TABLE IF EXISTS cdc.team

CREATE TABLE team(
   team_id BIGINT,
   team_name STRING,
   create_time TIMESTAMP,
   update_time TIMESTAMP,
proctime as proctime()
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'test',
 'table-name' = 'team'
)

CREATE DATABASE IF NOT EXISTS ods

DROP TABLE IF EXISTS ods.team

CREATE TABLE ods.team (
 team_id BIGINT,
 team_name STRING,
 create_time TIMESTAMP,
 update_time TIMESTAMP,
) PARTITIONED BY (
 ts_date STRING,
 ts_hour STRING,
 ts_minute STRING,
) STORED AS PARQUET TBLPROPERTIES (
 'sink.partition-commit.trigger' = 'partition-time',
 'sink.partition-commit.delay' = '1 min',
 'sink.partition-commit.policy.kind' = 'metastore,success-file',
 'partition.time-extractor.timestamp-pattern' = '$ts_date
$ts_hour:$ts_minute:00'
)

INSERT INTO ods.team
SELECT team_id, team_name, create_time, update_time,
 my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'),
 my_date_format(create_time,'HH', 'Asia/Shanghai'),
 my_date_format(create_time,'mm', 'Asia/Shanghai')
FROM cdc.team

回复