1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是  hive
ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。

2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming
相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive
中进行合并。merge过程可以参考这篇文章[1]。

3. 你可以 ts + INTERVAL '8' HOUR

PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive
streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。

Best,
Jark

On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[email protected]> wrote:

> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:[email protected]
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年10月31日 12:06,陈帅 写道:
> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> AppendStreamTableSink doesn't support consuming update and delete changes
> which is produced by node TableSourceScan(table=[[hive_catalog, cdc,
> team]], fields=[team_id, team_name, create_time, update_time])
>
> 我的问题:
> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢?
> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka
> -> hive streaming? 谢谢!
> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么?
>
> sql语句如下
>
> CREATE DATABASE IF NOT EXISTS cdc
>
> DROP TABLE IF EXISTS cdc.team
>
> CREATE TABLE team(
>    team_id BIGINT,
>    team_name STRING,
>    create_time TIMESTAMP,
>    update_time TIMESTAMP,
> proctime as proctime()
> ) WITH (
>  'connector' = 'mysql-cdc',
>  'hostname' = 'localhost',
>  'port' = '3306',
>  'username' = 'root',
>  'password' = 'root',
>  'database-name' = 'test',
>  'table-name' = 'team'
> )
>
> CREATE DATABASE IF NOT EXISTS ods
>
> DROP TABLE IF EXISTS ods.team
>
> CREATE TABLE ods.team (
>  team_id BIGINT,
>  team_name STRING,
>  create_time TIMESTAMP,
>  update_time TIMESTAMP,
> ) PARTITIONED BY (
>  ts_date STRING,
>  ts_hour STRING,
>  ts_minute STRING,
> ) STORED AS PARQUET TBLPROPERTIES (
>  'sink.partition-commit.trigger' = 'partition-time',
>  'sink.partition-commit.delay' = '1 min',
>  'sink.partition-commit.policy.kind' = 'metastore,success-file',
>  'partition.time-extractor.timestamp-pattern' = '$ts_date
> $ts_hour:$ts_minute:00'
> )
>
> INSERT INTO ods.team
> SELECT team_id, team_name, create_time, update_time,
>  my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'),
>  my_date_format(create_time,'HH', 'Asia/Shanghai'),
>  my_date_format(create_time,'mm', 'Asia/Shanghai')
> FROM cdc.team
>

回复