这个utc时间怎么设置,不能查看到hive数据的根本原因是 分区信息没有更新到metastore ; 你会发现文件生成了但是没有 _SUCCESS文件; 但是这样指定也不行?? tEnv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));
它的增删改只是在hive中对数据做了标记;后面可以通过join来处理数据 [email protected] 发件人: [email protected] 发送时间: 2020-11-02 13:37 收件人: user-zh 主题: Re: Re: flink mysql cdc + hive streaming疑问 你好! 看到你代码里,将增删改信息当做一个字段存到了Hive表中,那么到最后这些操作是怎么合并的呢? 发件人: Rui Li 发送时间: 2020-11-02 10:38 收件人: user-zh 抄送: Jark Wu 主题: Re: flink mysql cdc + hive streaming疑问 Hi, 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下 LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier); LOG.info("Committed partition {} to metastore", partitionSpec); LOG.info("Committed partition {} with success file", context.partitionSpec()); On Sun, Nov 1, 2020 at 5:36 PM 陈帅 <[email protected]> wrote: > 最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive > streaming不能自动注册hive分区吗?还是我使用的姿势不对? > > 陈帅 <[email protected]> 于2020年11月1日周日 下午5:24写道: > > > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容 > > ") STORED AS TEXTFILE TBLPROPERTIES (" > > > > 这是生成的hive表建表语句 > > > > hive> show create table team; > > OK > > CREATE TABLE `team`( > > `team_id` int, > > `team_name` string, > > `create_time` string, > > `update_time` string, > > `op` string) > > PARTITIONED BY ( > > `dt` string, > > `hr` string, > > `mi` string) > > ROW FORMAT SERDE > > 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' > > STORED AS INPUTFORMAT > > 'org.apache.hadoop.mapred.TextInputFormat' > > OUTPUTFORMAT > > 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' > > LOCATION > > 'hdfs://localhost:9000/user/hive/warehouse/ods.db/team' > > TBLPROPERTIES ( > > 'is_generic'='false', > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', > > 'sink.partition-commit.delay'='1 min', > > 'sink.partition-commit.policy.kind'='metastore,success-file', > > 'sink.partition-commit.trigger'='partition-time', > > 'transient_lastDdlTime'='1604222266') > > Time taken: 0.252 seconds, Fetched: 25 row(s) > > > > 另外,下载了hive文件内容如下 > > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 > 11:25:38<0x01>INSERT > > > > 还是查询不到结果 > > hive> select * from team; > > OK > > Time taken: 0.326 seconds > > > > 陈帅 <[email protected]> 于2020年11月1日周日 下午5:10写道: > > > >> > >> > 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 > >> 生成的hive分区文件路径类似于 > /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ > >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 > >> > >> 陈帅 <[email protected]> 于2020年11月1日周日 下午4:43写道: > >> > >>> > 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive > >>> shell查不到数据。 > >>> > >>> import com.alibaba.fastjson.JSON; > >>> import com.alibaba.fastjson.JSONObject; > >>> import org.apache.flink.api.common.serialization.SimpleStringSchema; > >>> import org.apache.flink.api.common.typeinfo.TypeInformation; > >>> import org.apache.flink.api.common.typeinfo.Types; > >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > >>> import org.apache.flink.streaming.api.CheckpointingMode; > >>> import org.apache.flink.streaming.api.TimeCharacteristic; > >>> import org.apache.flink.streaming.api.datastream.DataStream; > >>> import > >>> > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > >>> import > >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >>> import > >>> > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; > >>> import org.apache.flink.streaming.api.windowing.time.Time; > >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > >>> import > >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > >>> import org.apache.flink.table.api.EnvironmentSettings; > >>> import org.apache.flink.table.api.SqlDialect; > >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >>> import org.apache.flink.table.catalog.hive.HiveCatalog; > >>> import org.apache.flink.types.Row; > >>> import org.apache.flink.types.RowKind; > >>> > >>> import java.time.Duration; > >>> import java.time.Instant; > >>> import java.time.LocalDateTime; > >>> import java.time.ZoneId; > >>> import java.time.format.DateTimeFormatter; > >>> import java.util.Properties; > >>> > >>> public class MysqlCDC2Hive { > >>> > >>> private static final DateTimeFormatter dtf = > >>> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); > >>> > >>> public static void main(String[] args) throws Exception { > >>> StreamExecutionEnvironment streamEnv = > >>> StreamExecutionEnvironment.getExecutionEnvironment(); > >>> > >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >>> streamEnv.setParallelism(3); > >>> streamEnv.enableCheckpointing(60000); > >>> > >>> EnvironmentSettings tableEnvSettings = > >>> EnvironmentSettings.newInstance() > >>> .useBlinkPlanner() > >>> .inStreamingMode() > >>> .build(); > >>> StreamTableEnvironment tableEnv = > >>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > >>> > >>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > >>> CheckpointingMode.EXACTLY_ONCE); > >>> > >>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > >>> Duration.ofMinutes(1)); > >>> > >>> String catalogName = "hive_catalog"; > >>> HiveCatalog catalog = new HiveCatalog( > >>> catalogName, > >>> "default", > >>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > >>> "2.3.4" > >>> ); > >>> tableEnv.registerCatalog(catalogName, catalog); > >>> tableEnv.useCatalog(catalogName); > >>> > >>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > >>> tableEnv.registerFunction("my_date_format", myDateFormat); > >>> > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > >>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > >>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > >>> " team_id INT,\n" + > >>> " team_name STRING,\n" + > >>> " create_time TIMESTAMP,\n" + > >>> " update_time TIMESTAMP,\n" + > >>> " proctime as proctime()\n" + > >>> ") WITH (\n" + > >>> " 'connector' = 'mysql-cdc',\n" + > >>> " 'hostname' = 'localhost',\n" + > >>> " 'port' = '3306',\n" + > >>> " 'username' = 'root',\n" + > >>> " 'password' = 'root',\n" + > >>> " 'database-name' = 'test',\n" + > >>> " 'table-name' = 'team'\n" + > >>> ")"); > >>> > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > >>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > >>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > >>> " team_id INT,\n" + > >>> " team_name STRING,\n" + > >>> " create_time TIMESTAMP,\n" + > >>> " update_time TIMESTAMP\n" + > >>> ") WITH (\n" + > >>> " 'connector' = 'kafka',\n" + > >>> " 'topic' = 'team',\n" + > >>> " 'scan.startup.mode' = 'earliest-offset',\n" + > >>> " 'properties.bootstrap.servers' = > 'localhost:9092',\n" > >>> + > >>> " 'format' = 'changelog-json'\n" + > >>> ")"); > >>> > >>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > >>> "SELECT team_id, team_name, create_time, update_time > \n" > >>> + > >>> "FROM cdc.team"); > >>> > >>> // 定义带op字段的stream > >>> Properties properties = new Properties(); > >>> properties.setProperty("bootstrap.servers", "localhost:9092"); > >>> properties.setProperty("group.id", "test1`"); > >>> > >>> FlinkKafkaConsumerBase<String> consumer = new > >>> FlinkKafkaConsumer<>( > >>> "team", > >>> new SimpleStringSchema(), > >>> properties > >>> ).setStartFromEarliest(); > >>> > >>> DataStream<String> ds = streamEnv.addSource(consumer); > >>> > >>> String[] fieldNames = {"team_id", "team_name", "create_time", > >>> "update_time", "op"}; > >>> TypeInformation[] types = {Types.INT, Types.STRING, > >>> Types.STRING, Types.STRING, Types.STRING}; > >>> DataStream<Row> ds2 = ds.map(str -> { > >>> JSONObject jsonObject = JSON.parseObject(str); > >>> String op = jsonObject.getString("op"); > >>> JSONObject data = jsonObject.getJSONObject("data"); > >>> int arity = fieldNames.length; > >>> Row row = new Row(arity); > >>> row.setField(0, data.get("team_id")); > >>> row.setField(1, data.get("team_name")); > >>> row.setField(2, data.get("create_time")); > >>> row.setField(3, data.get("update_time")); > >>> String operation = getOperation(op); > >>> row.setField(4, operation); > >>> > >>> return row; > >>> }, new RowTypeInfo(types, fieldNames)) > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> *.assignTimestampsAndWatermarks(new > >>> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { > >>> @Override public long extractTimestamp(Row row) { > >>> String dt = (String) row.getField(2); LocalDateTime ldt > = > >>> LocalDateTime.parse(dt, dtf); Instant instant = > >>> ldt.atZone(ZoneId.systemDefault()).toInstant(); long > >>> timeInMillis = instant.toEpochMilli(); return > timeInMillis; > >>> } });* > >>> > >>> tableEnv.registerDataStream("merged_team", ds2); > >>> > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > >>> > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > >>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > >>> > >>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > >>> " team_id INT,\n" + > >>> " team_name STRING,\n" + > >>> " create_time STRING,\n" + > >>> " update_time STRING,\n" + > >>> " op STRING\n" + > >>> ") PARTITIONED BY (\n" + > >>> " dt STRING,\n" + > >>> " hr STRING,\n" + > >>> " mi STRING\n" + > >>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > >>> " 'sink.partition-commit.trigger' = > >>> 'partition-time',\n" + > >>> " 'sink.partition-commit.delay' = '1 min',\n" + > >>> " 'sink.partition-commit.policy.kind' = > >>> 'metastore,success-file',\n" + > >>> " 'partition.time-extractor.timestamp-pattern' = '$dt > >>> $hr:$mi:00'\n" + > >>> ")"); > >>> > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > >>> tableEnv.executeSql("INSERT INTO ods.team \n" + > >>> "SELECT team_id, team_name, create_time, update_time, > >>> op, \n" + > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>> HH:mm:ss'), 'yyyyMMdd') as dt, \n" + > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>> HH:mm:ss'), 'HH') as hr, \n" + > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>> HH:mm:ss'), 'mm') as mi \n" + > >>> "FROM merged_team"); > >>> tableEnv.execute("MysqlCDC2Hive2"); > >>> > >>> streamEnv.execute(""); > >>> } > >>> > >>> private static String getOperation(String op) { > >>> String operation = "INSERT"; > >>> for (RowKind rk : RowKind.values()) { > >>> if (rk.shortString().equals(op)) { > >>> switch (rk) { > >>> case UPDATE_BEFORE: > >>> case UPDATE_AFTER: > >>> operation = "UPDATE"; > >>> break; > >>> case DELETE: > >>> operation = "DELETE"; > >>> break; > >>> case INSERT: > >>> default: > >>> operation = "INSERT"; > >>> break; > >>> } > >>> break; > >>> } > >>> } > >>> return operation; > >>> } > >>> } > >>> > >>> Jark Wu <[email protected]> 于2020年11月1日周日 上午11:04写道: > >>> > >>>> 你检查一下 hive 文件是否正常生成了? > >>>> > >>>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger > >>>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 > >>>> > >>>> Best, > >>>> Jark > >>>> > >>>> > >>>> [1]: > >>>> > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger > >>>> > >>>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[email protected]> wrote: > >>>> > >>>>> > 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive > >>>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? > >>>>> > >>>>> cdc -> kafka示例消息如下 > >>>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 > >>>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} > >>>>> > >>>>> import com.alibaba.fastjson.JSON; > >>>>> import com.alibaba.fastjson.JSONObject; > >>>>> import org.apache.flink.api.common.serialization.SimpleStringSchema; > >>>>> import org.apache.flink.api.common.typeinfo.TypeInformation; > >>>>> import org.apache.flink.api.common.typeinfo.Types; > >>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > >>>>> import org.apache.flink.streaming.api.CheckpointingMode; > >>>>> import org.apache.flink.streaming.api.TimeCharacteristic; > >>>>> import org.apache.flink.streaming.api.datastream.DataStream; > >>>>> import > >>>>> > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > >>>>> import > >>>>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >>>>> import > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > >>>>> import > >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > >>>>> import org.apache.flink.table.api.EnvironmentSettings; > >>>>> import org.apache.flink.table.api.SqlDialect; > >>>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >>>>> import org.apache.flink.table.catalog.hive.HiveCatalog; > >>>>> import org.apache.flink.types.Row; > >>>>> import org.apache.flink.types.RowKind; > >>>>> > >>>>> import java.time.Duration; > >>>>> import java.util.Properties; > >>>>> > >>>>> public class MysqlCDC2Hive { > >>>>> public static void main(String[] args) throws Exception { > >>>>> StreamExecutionEnvironment streamEnv = > >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); > >>>>> > >>>>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >>>>> streamEnv.setParallelism(3); > >>>>> streamEnv.enableCheckpointing(60000); > >>>>> > >>>>> EnvironmentSettings tableEnvSettings = > >>>>> EnvironmentSettings.newInstance() > >>>>> .useBlinkPlanner() > >>>>> .inStreamingMode() > >>>>> .build(); > >>>>> StreamTableEnvironment tableEnv = > >>>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > >>>>> > >>>>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > >>>>> CheckpointingMode.EXACTLY_ONCE); > >>>>> > >>>>> > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > >>>>> Duration.ofMinutes(1)); > >>>>> > >>>>> String catalogName = "hive_catalog"; > >>>>> HiveCatalog catalog = new HiveCatalog( > >>>>> catalogName, > >>>>> "default", > >>>>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > >>>>> "2.3.4" > >>>>> ); > >>>>> tableEnv.registerCatalog(catalogName, catalog); > >>>>> tableEnv.useCatalog(catalogName); > >>>>> > >>>>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > >>>>> tableEnv.registerFunction("my_date_format", myDateFormat); > >>>>> > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > >>>>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > >>>>> " team_id INT,\n" + > >>>>> " team_name STRING,\n" + > >>>>> " create_time TIMESTAMP,\n" + > >>>>> " update_time TIMESTAMP,\n" + > >>>>> " proctime as proctime()\n" + > >>>>> ") WITH (\n" + > >>>>> " 'connector' = 'mysql-cdc',\n" + > >>>>> " 'hostname' = 'localhost',\n" + > >>>>> " 'port' = '3306',\n" + > >>>>> " 'username' = 'root',\n" + > >>>>> " 'password' = 'root',\n" + > >>>>> " 'database-name' = 'test',\n" + > >>>>> " 'table-name' = 'team'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > >>>>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > >>>>> " team_id INT,\n" + > >>>>> " team_name STRING,\n" + > >>>>> " create_time TIMESTAMP,\n" + > >>>>> " update_time TIMESTAMP\n" + > >>>>> ") WITH (\n" + > >>>>> " 'connector' = 'kafka',\n" + > >>>>> " 'topic' = 'team',\n" + > >>>>> " 'scan.startup.mode' = 'earliest-offset',\n" + > >>>>> " 'properties.bootstrap.servers' = > >>>>> 'localhost:9092',\n" + > >>>>> " 'format' = 'changelog-json'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > >>>>> "SELECT team_id, team_name, create_time, update_time > >>>>> \n" + > >>>>> "FROM cdc.team"); > >>>>> > >>>>> // 定义带op字段的stream > >>>>> Properties properties = new Properties(); > >>>>> properties.setProperty("bootstrap.servers", > "localhost:9092"); > >>>>> properties.setProperty("group.id", "test"); > >>>>> > >>>>> FlinkKafkaConsumerBase<String> consumer = new > >>>>> FlinkKafkaConsumer<>( > >>>>> "team", > >>>>> new SimpleStringSchema(), > >>>>> properties > >>>>> ).setStartFromEarliest(); > >>>>> > >>>>> DataStream<String> ds = streamEnv.addSource(consumer); > >>>>> > >>>>> String[] fieldNames = {"team_id", "team_name", "create_time", > >>>>> "update_time", "op"}; > >>>>> TypeInformation[] types = {Types.INT, Types.STRING, > >>>>> Types.STRING, Types.STRING, Types.STRING}; > >>>>> DataStream<Row> ds2 = ds.map(str -> { > >>>>> JSONObject jsonObject = JSON.parseObject(str); > >>>>> String op = jsonObject.getString("op"); > >>>>> JSONObject data = jsonObject.getJSONObject("data"); > >>>>> int arity = fieldNames.length; > >>>>> Row row = new Row(arity); > >>>>> row.setField(0, data.get("team_id")); > >>>>> row.setField(1, data.get("team_name")); > >>>>> row.setField(2, data.get("create_time")); > >>>>> row.setField(3, data.get("update_time")); > >>>>> String operation = getOperation(op); > >>>>> row.setField(4, operation); > >>>>> > >>>>> return row; > >>>>> }, new RowTypeInfo(types, fieldNames)); > >>>>> > >>>>> tableEnv.registerDataStream("merged_team", ds2); > >>>>> > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > >>>>> > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > >>>>> > >>>>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > >>>>> " team_id INT,\n" + > >>>>> " team_name STRING,\n" + > >>>>> " create_time STRING,\n" + > >>>>> " update_time STRING,\n" + > >>>>> " op STRING\n" + > >>>>> // ") PARTITIONED BY (\n" + > >>>>> // " ts_date STRING,\n" + > >>>>> // " ts_hour STRING,\n" + > >>>>> // " ts_minute STRING\n" + > >>>>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > >>>>> " 'sink.partition-commit.trigger' = > >>>>> 'partition-time',\n" + > >>>>> " 'sink.partition-commit.delay' = '1 min',\n" + > >>>>> " 'sink.partition-commit.policy.kind' = > >>>>> 'metastore,success-file',\n" + > >>>>> " 'partition.time-extractor.timestamp-pattern' = > >>>>> '$ts_date $ts_hour:$ts_minute:00'\n" + > >>>>> ")"); > >>>>> > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > >>>>> tableEnv.executeSql("INSERT INTO ods.team \n" + > >>>>> "SELECT team_id, team_name, create_time, update_time, > >>>>> op \n" + > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>>>> HH:mm:ss'), 'HH') as ts_hour, \n" + > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > >>>>> HH:mm:ss'), 'mm') as ts_minute \n" + > >>>>> "FROM merged_team"); > >>>>> tableEnv.execute("MysqlCDC2Hive2"); > >>>>> > >>>>> streamEnv.execute(""); > >>>>> } > >>>>> > >>>>> private static String getOperation(String op) { > >>>>> String operation = "INSERT"; > >>>>> for (RowKind rk : RowKind.values()) { > >>>>> if (rk.shortString().equals(op)) { > >>>>> switch (rk) { > >>>>> case UPDATE_BEFORE: > >>>>> case UPDATE_AFTER: > >>>>> operation = "UPDATE"; > >>>>> break; > >>>>> case DELETE: > >>>>> operation = "DELETE"; > >>>>> break; > >>>>> case INSERT: > >>>>> default: > >>>>> operation = "INSERT"; > >>>>> break; > >>>>> } > >>>>> break; > >>>>> } > >>>>> } > >>>>> return operation; > >>>>> } > >>>>> } > >>>>> > >>>>> Jark Wu <[email protected]> 于2020年10月31日周六 下午1:45写道: > >>>>> > >>>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive > >>>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 > >>>>>> > >>>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming > >>>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive > >>>>>> 中进行合并。merge过程可以参考这篇文章[1]。 > >>>>>> > >>>>>> 3. 你可以 ts + INTERVAL '8' HOUR > >>>>>> > >>>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive > >>>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 > >>>>>> > >>>>>> Best, > >>>>>> Jark > >>>>>> > >>>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[email protected]> wrote: > >>>>>> > >>>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 > >>>>>>> > >>>>>>> > >>>>>>> | | > >>>>>>> 罗显宴 > >>>>>>> | > >>>>>>> | > >>>>>>> 邮箱:[email protected] > >>>>>>> | > >>>>>>> > >>>>>>> 签名由 网易邮箱大师 定制 > >>>>>>> > >>>>>>> 在2020年10月31日 12:06,陈帅 写道: > >>>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 > >>>>>>> > >>>>>>> Exception in thread "main" > org.apache.flink.table.api.TableException: > >>>>>>> AppendStreamTableSink doesn't support consuming update and delete > >>>>>>> changes > >>>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, > cdc, > >>>>>>> team]], fields=[team_id, team_name, create_time, update_time]) > >>>>>>> > >>>>>>> 我的问题: > >>>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? > >>>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> > >>>>>>> kafka,然后kafka > >>>>>>> -> hive streaming? 谢谢! > >>>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? > >>>>>>> > >>>>>>> sql语句如下 > >>>>>>> > >>>>>>> CREATE DATABASE IF NOT EXISTS cdc > >>>>>>> > >>>>>>> DROP TABLE IF EXISTS cdc.team > >>>>>>> > >>>>>>> CREATE TABLE team( > >>>>>>> team_id BIGINT, > >>>>>>> team_name STRING, > >>>>>>> create_time TIMESTAMP, > >>>>>>> update_time TIMESTAMP, > >>>>>>> proctime as proctime() > >>>>>>> ) WITH ( > >>>>>>> 'connector' = 'mysql-cdc', > >>>>>>> 'hostname' = 'localhost', > >>>>>>> 'port' = '3306', > >>>>>>> 'username' = 'root', > >>>>>>> 'password' = 'root', > >>>>>>> 'database-name' = 'test', > >>>>>>> 'table-name' = 'team' > >>>>>>> ) > >>>>>>> > >>>>>>> CREATE DATABASE IF NOT EXISTS ods > >>>>>>> > >>>>>>> DROP TABLE IF EXISTS ods.team > >>>>>>> > >>>>>>> CREATE TABLE ods.team ( > >>>>>>> team_id BIGINT, > >>>>>>> team_name STRING, > >>>>>>> create_time TIMESTAMP, > >>>>>>> update_time TIMESTAMP, > >>>>>>> ) PARTITIONED BY ( > >>>>>>> ts_date STRING, > >>>>>>> ts_hour STRING, > >>>>>>> ts_minute STRING, > >>>>>>> ) STORED AS PARQUET TBLPROPERTIES ( > >>>>>>> 'sink.partition-commit.trigger' = 'partition-time', > >>>>>>> 'sink.partition-commit.delay' = '1 min', > >>>>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', > >>>>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date > >>>>>>> $ts_hour:$ts_minute:00' > >>>>>>> ) > >>>>>>> > >>>>>>> INSERT INTO ods.team > >>>>>>> SELECT team_id, team_name, create_time, update_time, > >>>>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), > >>>>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), > >>>>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') > >>>>>>> FROM cdc.team > >>>>>>> > >>>>>> > -- Best regards! Rui Li
