您好,

您设置的分区提交策略是既写入hive的metastore,又会在分区目录中生成_SUCCESS文件

'sink.partition-commit.policy.kind' = 'metastore,success-file',

可以看看分区目录中的_SUCCESS文件有没有生成,没有的话hive那边也是由于分区的元数据没有提交导致的。

分区元数据提交延迟好像是跟Checkpoint的周期以及 'sink.partition-commit.delay' 
配置有关,可以尝试等待两者相加的时间在看看hive是否可以查询。

祝好,


________________________________
发件人: 陈帅 <[email protected]>
发送时间: 2020年11月1日 下午 05:36
收件人: Jark Wu <[email protected]>
抄送: user-zh <[email protected]>
主题: Re: flink mysql cdc + hive streaming疑问

最后,在hive shell中执行 “msck repair table team;”  命令后就能查询到写的数据了,难道flink hive
streaming不能自动注册hive分区吗?还是我使用的姿势不对?

陈帅 <[email protected]> 于2020年11月1日周日 下午5:24写道:

> 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
> ") STORED AS TEXTFILE TBLPROPERTIES ("
>
> 这是生成的hive表建表语句
>
> hive> show create table team;
> OK
> CREATE TABLE `team`(
>   `team_id` int,
>   `team_name` string,
>   `create_time` string,
>   `update_time` string,
>   `op` string)
> PARTITIONED BY (
>   `dt` string,
>   `hr` string,
>   `mi` string)
> ROW FORMAT SERDE
>   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
> STORED AS INPUTFORMAT
>   'org.apache.hadoop.mapred.TextInputFormat'
> OUTPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> LOCATION
>   'hdfs://localhost:9000/user/hive/warehouse/ods.db/team'
> TBLPROPERTIES (
>   'is_generic'='false',
>   'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
>   'sink.partition-commit.delay'='1 min',
>   'sink.partition-commit.policy.kind'='metastore,success-file',
>   'sink.partition-commit.trigger'='partition-time',
>   'transient_lastDdlTime'='1604222266')
> Time taken: 0.252 seconds, Fetched: 25 row(s)
>
> 另外,下载了hive文件内容如下
> 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 11:25:38<0x01>INSERT
>
> 还是查询不到结果
> hive> select * from team;
> OK
> Time taken: 0.326 seconds
>
> 陈帅 <[email protected]> 于2020年11月1日周日 下午5:10写道:
>
>>
>> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
>> 生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
>> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
>>
>> 陈帅 <[email protected]> 于2020年11月1日周日 下午4:43写道:
>>
>>> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
>>> shell查不到数据。
>>>
>>> import com.alibaba.fastjson.JSON;
>>> import com.alibaba.fastjson.JSONObject;
>>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>> import org.apache.flink.api.common.typeinfo.Types;
>>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>>> import org.apache.flink.streaming.api.CheckpointingMode;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import
>>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>>> import
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.SqlDialect;
>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>>> import org.apache.flink.types.Row;
>>> import org.apache.flink.types.RowKind;
>>>
>>> import java.time.Duration;
>>> import java.time.Instant;
>>> import java.time.LocalDateTime;
>>> import java.time.ZoneId;
>>> import java.time.format.DateTimeFormatter;
>>> import java.util.Properties;
>>>
>>> public class MysqlCDC2Hive {
>>>
>>>     private static final DateTimeFormatter dtf =
>>> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
>>>
>>>     public static void main(String[] args) throws Exception {
>>>         StreamExecutionEnvironment streamEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>         streamEnv.setParallelism(3);
>>>         streamEnv.enableCheckpointing(60000);
>>>
>>>         EnvironmentSettings tableEnvSettings =
>>> EnvironmentSettings.newInstance()
>>>                 .useBlinkPlanner()
>>>                 .inStreamingMode()
>>>                 .build();
>>>         StreamTableEnvironment tableEnv =
>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings);
>>>
>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>>> CheckpointingMode.EXACTLY_ONCE);
>>>
>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>>> Duration.ofMinutes(1));
>>>
>>>         String catalogName = "hive_catalog";
>>>         HiveCatalog catalog = new HiveCatalog(
>>>                 catalogName,
>>>                 "default",
>>>                 "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf",
>>>                 "2.3.4"
>>>         );
>>>         tableEnv.registerCatalog(catalogName, catalog);
>>>         tableEnv.useCatalog(catalogName);
>>>
>>>         MyDateFormat2 myDateFormat = new MyDateFormat2();
>>>         tableEnv.registerFunction("my_date_format", myDateFormat);
>>>
>>>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
>>>         tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team");
>>>         tableEnv.executeSql("CREATE TABLE cdc.team(\n" +
>>>                 "    team_id INT,\n" +
>>>                 "    team_name STRING,\n" +
>>>                 "    create_time TIMESTAMP,\n" +
>>>                 "    update_time TIMESTAMP,\n" +
>>>                 "    proctime as proctime()\n" +
>>>                 ") WITH (\n" +
>>>                 "  'connector' = 'mysql-cdc',\n" +
>>>                 "  'hostname' = 'localhost',\n" +
>>>                 "  'port' = '3306',\n" +
>>>                 "  'username' = 'root',\n" +
>>>                 "  'password' = 'root',\n" +
>>>                 "  'database-name' = 'test',\n" +
>>>                 "  'table-name' = 'team'\n" +
>>>                 ")");
>>>
>>>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
>>>         tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team");
>>>         tableEnv.executeSql("CREATE TABLE kafka.team (\n" +
>>>                 "  team_id INT,\n" +
>>>                 "  team_name STRING,\n" +
>>>                 "  create_time TIMESTAMP,\n" +
>>>                 "  update_time TIMESTAMP\n" +
>>>                 ") WITH (\n" +
>>>                 "  'connector' = 'kafka',\n" +
>>>                 "  'topic' = 'team',\n" +
>>>                 "  'scan.startup.mode' = 'earliest-offset',\n" +
>>>                 "  'properties.bootstrap.servers' = 'localhost:9092',\n"
>>> +
>>>                 "  'format' = 'changelog-json'\n" +
>>>                 ")");
>>>
>>>         tableEnv.executeSql("INSERT INTO kafka.team \n" +
>>>                 "SELECT team_id, team_name, create_time, update_time \n"
>>> +
>>>                 "FROM cdc.team");
>>>
>>>         // 定义带op字段的stream
>>>         Properties properties = new Properties();
>>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>>         properties.setProperty("group.id", "test1`");
>>>
>>>         FlinkKafkaConsumerBase<String> consumer = new
>>> FlinkKafkaConsumer<>(
>>>                 "team",
>>>                 new SimpleStringSchema(),
>>>                 properties
>>>         ).setStartFromEarliest();
>>>
>>>         DataStream<String> ds = streamEnv.addSource(consumer);
>>>
>>>         String[] fieldNames = {"team_id", "team_name", "create_time",
>>> "update_time", "op"};
>>>         TypeInformation[] types = {Types.INT, Types.STRING,
>>> Types.STRING, Types.STRING, Types.STRING};
>>>         DataStream<Row> ds2 = ds.map(str -> {
>>>             JSONObject jsonObject = JSON.parseObject(str);
>>>             String op = jsonObject.getString("op");
>>>             JSONObject data = jsonObject.getJSONObject("data");
>>>             int arity = fieldNames.length;
>>>             Row row = new Row(arity);
>>>             row.setField(0, data.get("team_id"));
>>>             row.setField(1, data.get("team_name"));
>>>             row.setField(2, data.get("create_time"));
>>>             row.setField(3, data.get("update_time"));
>>>             String operation = getOperation(op);
>>>             row.setField(4, operation);
>>>
>>>             return row;
>>>         }, new RowTypeInfo(types, fieldNames))
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *.assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) {
>>> @Override            public long extractTimestamp(Row row) {
>>> String dt = (String) row.getField(2);                LocalDateTime ldt =
>>> LocalDateTime.parse(dt, dtf);                Instant instant =
>>> ldt.atZone(ZoneId.systemDefault()).toInstant();                long
>>> timeInMillis = instant.toEpochMilli();                return timeInMillis;
>>>           }        });*
>>>
>>>         tableEnv.registerDataStream("merged_team", ds2);
>>>
>>>         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>>>
>>>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
>>>         tableEnv.executeSql("DROP TABLE IF EXISTS ods.team");
>>>
>>>         tableEnv.executeSql("CREATE TABLE ods.team (\n" +
>>>                 "  team_id INT,\n" +
>>>                 "  team_name STRING,\n" +
>>>                 "  create_time STRING,\n" +
>>>                 "  update_time STRING,\n" +
>>>                 "  op STRING\n" +
>>>                 ") PARTITIONED BY (\n" +
>>>                 "    dt STRING,\n" +
>>>                 "    hr STRING,\n" +
>>>                 "    mi STRING\n" +
>>>                 ") STORED AS PARQUET TBLPROPERTIES (\n" +
>>>                 "  'sink.partition-commit.trigger' =
>>> 'partition-time',\n" +
>>>                 "  'sink.partition-commit.delay' = '1 min',\n" +
>>>                 "  'sink.partition-commit.policy.kind' =
>>> 'metastore,success-file',\n" +
>>>                 "  'partition.time-extractor.timestamp-pattern' = '$dt
>>> $hr:$mi:00'\n" +
>>>                 ")");
>>>
>>>         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>>>         tableEnv.executeSql("INSERT INTO ods.team \n" +
>>>                 "SELECT team_id, team_name, create_time, update_time,
>>> op, \n" +
>>>                 " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
>>> HH:mm:ss'), 'yyyyMMdd') as dt, \n" +
>>>                 " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
>>> HH:mm:ss'), 'HH') as hr, \n" +
>>>                 " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
>>> HH:mm:ss'), 'mm') as mi \n" +
>>>                 "FROM merged_team");
>>>         tableEnv.execute("MysqlCDC2Hive2");
>>>
>>>         streamEnv.execute("");
>>>     }
>>>
>>>     private static String getOperation(String op) {
>>>         String operation = "INSERT";
>>>         for (RowKind rk : RowKind.values()) {
>>>             if (rk.shortString().equals(op)) {
>>>                 switch (rk) {
>>>                     case UPDATE_BEFORE:
>>>                     case UPDATE_AFTER:
>>>                         operation = "UPDATE";
>>>                         break;
>>>                     case DELETE:
>>>                         operation = "DELETE";
>>>                         break;
>>>                     case INSERT:
>>>                     default:
>>>                         operation = "INSERT";
>>>                         break;
>>>                 }
>>>                 break;
>>>             }
>>>         }
>>>         return operation;
>>>     }
>>> }
>>>
>>> Jark Wu <[email protected]> 于2020年11月1日周日 上午11:04写道:
>>>
>>>> 你检查一下 hive 文件是否正常生成了?
>>>>
>>>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger
>>>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>>
>>>> [1]:
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger
>>>>
>>>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[email protected]> wrote:
>>>>
>>>>> 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive
>>>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗?
>>>>>
>>>>> cdc -> kafka示例消息如下
>>>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31
>>>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"}
>>>>>
>>>>> import com.alibaba.fastjson.JSON;
>>>>> import com.alibaba.fastjson.JSONObject;
>>>>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>>>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>>>> import org.apache.flink.api.common.typeinfo.Types;
>>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>>>>> import org.apache.flink.streaming.api.CheckpointingMode;
>>>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>>> import
>>>>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
>>>>> import
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>>>>> import
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>> import org.apache.flink.table.api.SqlDialect;
>>>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>>>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>>>>> import org.apache.flink.types.Row;
>>>>> import org.apache.flink.types.RowKind;
>>>>>
>>>>> import java.time.Duration;
>>>>> import java.util.Properties;
>>>>>
>>>>> public class MysqlCDC2Hive {
>>>>>     public static void main(String[] args) throws Exception {
>>>>>         StreamExecutionEnvironment streamEnv =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>
>>>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>>         streamEnv.setParallelism(3);
>>>>>         streamEnv.enableCheckpointing(60000);
>>>>>
>>>>>         EnvironmentSettings tableEnvSettings =
>>>>> EnvironmentSettings.newInstance()
>>>>>                 .useBlinkPlanner()
>>>>>                 .inStreamingMode()
>>>>>                 .build();
>>>>>         StreamTableEnvironment tableEnv =
>>>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings);
>>>>>
>>>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>>>>> CheckpointingMode.EXACTLY_ONCE);
>>>>>
>>>>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>>>>> Duration.ofMinutes(1));
>>>>>
>>>>>         String catalogName = "hive_catalog";
>>>>>         HiveCatalog catalog = new HiveCatalog(
>>>>>                 catalogName,
>>>>>                 "default",
>>>>>                 "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf",
>>>>>                 "2.3.4"
>>>>>         );
>>>>>         tableEnv.registerCatalog(catalogName, catalog);
>>>>>         tableEnv.useCatalog(catalogName);
>>>>>
>>>>>         MyDateFormat2 myDateFormat = new MyDateFormat2();
>>>>>         tableEnv.registerFunction("my_date_format", myDateFormat);
>>>>>
>>>>>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
>>>>>         tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team");
>>>>>         tableEnv.executeSql("CREATE TABLE cdc.team(\n" +
>>>>>                 "    team_id INT,\n" +
>>>>>                 "    team_name STRING,\n" +
>>>>>                 "    create_time TIMESTAMP,\n" +
>>>>>                 "    update_time TIMESTAMP,\n" +
>>>>>                 "    proctime as proctime()\n" +
>>>>>                 ") WITH (\n" +
>>>>>                 "  'connector' = 'mysql-cdc',\n" +
>>>>>                 "  'hostname' = 'localhost',\n" +
>>>>>                 "  'port' = '3306',\n" +
>>>>>                 "  'username' = 'root',\n" +
>>>>>                 "  'password' = 'root',\n" +
>>>>>                 "  'database-name' = 'test',\n" +
>>>>>                 "  'table-name' = 'team'\n" +
>>>>>                 ")");
>>>>>
>>>>>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
>>>>>         tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team");
>>>>>         tableEnv.executeSql("CREATE TABLE kafka.team (\n" +
>>>>>                 "  team_id INT,\n" +
>>>>>                 "  team_name STRING,\n" +
>>>>>                 "  create_time TIMESTAMP,\n" +
>>>>>                 "  update_time TIMESTAMP\n" +
>>>>>                 ") WITH (\n" +
>>>>>                 "  'connector' = 'kafka',\n" +
>>>>>                 "  'topic' = 'team',\n" +
>>>>>                 "  'scan.startup.mode' = 'earliest-offset',\n" +
>>>>>                 "  'properties.bootstrap.servers' =
>>>>> 'localhost:9092',\n" +
>>>>>                 "  'format' = 'changelog-json'\n" +
>>>>>                 ")");
>>>>>
>>>>>         tableEnv.executeSql("INSERT INTO kafka.team \n" +
>>>>>                 "SELECT team_id, team_name, create_time, update_time
>>>>> \n" +
>>>>>                 "FROM cdc.team");
>>>>>
>>>>>         // 定义带op字段的stream
>>>>>         Properties properties = new Properties();
>>>>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>>>>         properties.setProperty("group.id", "test");
>>>>>
>>>>>         FlinkKafkaConsumerBase<String> consumer = new
>>>>> FlinkKafkaConsumer<>(
>>>>>                 "team",
>>>>>                 new SimpleStringSchema(),
>>>>>                 properties
>>>>>         ).setStartFromEarliest();
>>>>>
>>>>>         DataStream<String> ds = streamEnv.addSource(consumer);
>>>>>
>>>>>         String[] fieldNames = {"team_id", "team_name", "create_time",
>>>>> "update_time", "op"};
>>>>>         TypeInformation[] types = {Types.INT, Types.STRING,
>>>>> Types.STRING, Types.STRING, Types.STRING};
>>>>>         DataStream<Row> ds2 = ds.map(str -> {
>>>>>             JSONObject jsonObject = JSON.parseObject(str);
>>>>>             String op = jsonObject.getString("op");
>>>>>             JSONObject data = jsonObject.getJSONObject("data");
>>>>>             int arity = fieldNames.length;
>>>>>             Row row = new Row(arity);
>>>>>             row.setField(0, data.get("team_id"));
>>>>>             row.setField(1, data.get("team_name"));
>>>>>             row.setField(2, data.get("create_time"));
>>>>>             row.setField(3, data.get("update_time"));
>>>>>             String operation = getOperation(op);
>>>>>             row.setField(4, operation);
>>>>>
>>>>>             return row;
>>>>>         }, new RowTypeInfo(types, fieldNames));
>>>>>
>>>>>         tableEnv.registerDataStream("merged_team", ds2);
>>>>>
>>>>>         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>>>>>
>>>>>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
>>>>>         tableEnv.executeSql("DROP TABLE IF EXISTS ods.team");
>>>>>
>>>>>         tableEnv.executeSql("CREATE TABLE ods.team (\n" +
>>>>>                 "  team_id INT,\n" +
>>>>>                 "  team_name STRING,\n" +
>>>>>                 "  create_time STRING,\n" +
>>>>>                 "  update_time STRING,\n" +
>>>>>                 "  op STRING\n" +
>>>>> //                ") PARTITIONED BY (\n" +
>>>>> //                "    ts_date STRING,\n" +
>>>>> //                "    ts_hour STRING,\n" +
>>>>> //                "    ts_minute STRING\n" +
>>>>>                 ") STORED AS PARQUET TBLPROPERTIES (\n" +
>>>>>                 "  'sink.partition-commit.trigger' =
>>>>> 'partition-time',\n" +
>>>>>                 "  'sink.partition-commit.delay' = '1 min',\n" +
>>>>>                 "  'sink.partition-commit.policy.kind' =
>>>>> 'metastore,success-file',\n" +
>>>>>                 "  'partition.time-extractor.timestamp-pattern' =
>>>>> '$ts_date $ts_hour:$ts_minute:00'\n" +
>>>>>                 ")");
>>>>>
>>>>>         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>>>>>         tableEnv.executeSql("INSERT INTO ods.team \n" +
>>>>>                 "SELECT team_id, team_name, create_time, update_time,
>>>>> op \n" +
>>>>> //                " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
>>>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" +
>>>>> //                " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
>>>>> HH:mm:ss'), 'HH') as ts_hour, \n" +
>>>>> //                " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
>>>>> HH:mm:ss'), 'mm') as ts_minute \n" +
>>>>>                 "FROM merged_team");
>>>>>         tableEnv.execute("MysqlCDC2Hive2");
>>>>>
>>>>>         streamEnv.execute("");
>>>>>     }
>>>>>
>>>>>     private static String getOperation(String op) {
>>>>>         String operation = "INSERT";
>>>>>         for (RowKind rk : RowKind.values()) {
>>>>>             if (rk.shortString().equals(op)) {
>>>>>                 switch (rk) {
>>>>>                     case UPDATE_BEFORE:
>>>>>                     case UPDATE_AFTER:
>>>>>                         operation = "UPDATE";
>>>>>                         break;
>>>>>                     case DELETE:
>>>>>                         operation = "DELETE";
>>>>>                         break;
>>>>>                     case INSERT:
>>>>>                     default:
>>>>>                         operation = "INSERT";
>>>>>                         break;
>>>>>                 }
>>>>>                 break;
>>>>>             }
>>>>>         }
>>>>>         return operation;
>>>>>     }
>>>>> }
>>>>>
>>>>> Jark Wu <[email protected]> 于2020年10月31日周六 下午1:45写道:
>>>>>
>>>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是  hive
>>>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。
>>>>>>
>>>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming
>>>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive
>>>>>> 中进行合并。merge过程可以参考这篇文章[1]。
>>>>>>
>>>>>> 3. 你可以 ts + INTERVAL '8' HOUR
>>>>>>
>>>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive
>>>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[email protected]> wrote:
>>>>>>
>>>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。
>>>>>>>
>>>>>>>
>>>>>>> | |
>>>>>>> 罗显宴
>>>>>>> |
>>>>>>> |
>>>>>>> 邮箱:[email protected]
>>>>>>> |
>>>>>>>
>>>>>>> 签名由 网易邮箱大师 定制
>>>>>>>
>>>>>>> 在2020年10月31日 12:06,陈帅 写道:
>>>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛
>>>>>>>
>>>>>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>>>>>> AppendStreamTableSink doesn't support consuming update and delete
>>>>>>> changes
>>>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, cdc,
>>>>>>> team]], fields=[team_id, team_name, create_time, update_time])
>>>>>>>
>>>>>>> 我的问题:
>>>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢?
>>>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc ->
>>>>>>> kafka,然后kafka
>>>>>>> -> hive streaming? 谢谢!
>>>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么?
>>>>>>>
>>>>>>> sql语句如下
>>>>>>>
>>>>>>> CREATE DATABASE IF NOT EXISTS cdc
>>>>>>>
>>>>>>> DROP TABLE IF EXISTS cdc.team
>>>>>>>
>>>>>>> CREATE TABLE team(
>>>>>>>    team_id BIGINT,
>>>>>>>    team_name STRING,
>>>>>>>    create_time TIMESTAMP,
>>>>>>>    update_time TIMESTAMP,
>>>>>>> proctime as proctime()
>>>>>>> ) WITH (
>>>>>>>  'connector' = 'mysql-cdc',
>>>>>>>  'hostname' = 'localhost',
>>>>>>>  'port' = '3306',
>>>>>>>  'username' = 'root',
>>>>>>>  'password' = 'root',
>>>>>>>  'database-name' = 'test',
>>>>>>>  'table-name' = 'team'
>>>>>>> )
>>>>>>>
>>>>>>> CREATE DATABASE IF NOT EXISTS ods
>>>>>>>
>>>>>>> DROP TABLE IF EXISTS ods.team
>>>>>>>
>>>>>>> CREATE TABLE ods.team (
>>>>>>>  team_id BIGINT,
>>>>>>>  team_name STRING,
>>>>>>>  create_time TIMESTAMP,
>>>>>>>  update_time TIMESTAMP,
>>>>>>> ) PARTITIONED BY (
>>>>>>>  ts_date STRING,
>>>>>>>  ts_hour STRING,
>>>>>>>  ts_minute STRING,
>>>>>>> ) STORED AS PARQUET TBLPROPERTIES (
>>>>>>>  'sink.partition-commit.trigger' = 'partition-time',
>>>>>>>  'sink.partition-commit.delay' = '1 min',
>>>>>>>  'sink.partition-commit.policy.kind' = 'metastore,success-file',
>>>>>>>  'partition.time-extractor.timestamp-pattern' = '$ts_date
>>>>>>> $ts_hour:$ts_minute:00'
>>>>>>> )
>>>>>>>
>>>>>>> INSERT INTO ods.team
>>>>>>> SELECT team_id, team_name, create_time, update_time,
>>>>>>>  my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'),
>>>>>>>  my_date_format(create_time,'HH', 'Asia/Shanghai'),
>>>>>>>  my_date_format(create_time,'mm', 'Asia/Shanghai')
>>>>>>> FROM cdc.team
>>>>>>>
>>>>>>

回复