- 你可以用 proc-time - 或者在你的Source上添加 **UTC时区的Watermark**,注意是 **UTC**,SQL的watermark都是 **UTC**的
On Mon, Nov 2, 2020 at 10:38 AM Rui Li <lirui.fu...@gmail.com> wrote: > Hi, > > 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success > file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下 > > LOG.info("Partition {} of table {} is ready to be committed", > partSpec, tableIdentifier); > > LOG.info("Committed partition {} to metastore", partitionSpec); > > LOG.info("Committed partition {} with success file", > context.partitionSpec()); > > > On Sun, Nov 1, 2020 at 5:36 PM 陈帅 <casel.c...@gmail.com> wrote: > > > 最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive > > streaming不能自动注册hive分区吗?还是我使用的姿势不对? > > > > 陈帅 <casel.c...@gmail.com> 于2020年11月1日周日 下午5:24写道: > > > > > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容 > > > ") STORED AS TEXTFILE TBLPROPERTIES (" > > > > > > 这是生成的hive表建表语句 > > > > > > hive> show create table team; > > > OK > > > CREATE TABLE `team`( > > > `team_id` int, > > > `team_name` string, > > > `create_time` string, > > > `update_time` string, > > > `op` string) > > > PARTITIONED BY ( > > > `dt` string, > > > `hr` string, > > > `mi` string) > > > ROW FORMAT SERDE > > > 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' > > > STORED AS INPUTFORMAT > > > 'org.apache.hadoop.mapred.TextInputFormat' > > > OUTPUTFORMAT > > > 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' > > > LOCATION > > > 'hdfs://localhost:9000/user/hive/warehouse/ods.db/team' > > > TBLPROPERTIES ( > > > 'is_generic'='false', > > > 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', > > > 'sink.partition-commit.delay'='1 min', > > > 'sink.partition-commit.policy.kind'='metastore,success-file', > > > 'sink.partition-commit.trigger'='partition-time', > > > 'transient_lastDdlTime'='1604222266') > > > Time taken: 0.252 seconds, Fetched: 25 row(s) > > > > > > 另外,下载了hive文件内容如下 > > > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31 > > 11:25:38<0x01>INSERT > > > > > > 还是查询不到结果 > > > hive> select * from team; > > > OK > > > Time taken: 0.326 seconds > > > > > > 陈帅 <casel.c...@gmail.com> 于2020年11月1日周日 下午5:10写道: > > > > > >> > > >> > > > 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 > > >> 生成的hive分区文件路径类似于 > > /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ > > >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 > > >> > > >> 陈帅 <casel.c...@gmail.com> 于2020年11月1日周日 下午4:43写道: > > >> > > >>> > > > 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive > > >>> shell查不到数据。 > > >>> > > >>> import com.alibaba.fastjson.JSON; > > >>> import com.alibaba.fastjson.JSONObject; > > >>> import org.apache.flink.api.common.serialization.SimpleStringSchema; > > >>> import org.apache.flink.api.common.typeinfo.TypeInformation; > > >>> import org.apache.flink.api.common.typeinfo.Types; > > >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > > >>> import org.apache.flink.streaming.api.CheckpointingMode; > > >>> import org.apache.flink.streaming.api.TimeCharacteristic; > > >>> import org.apache.flink.streaming.api.datastream.DataStream; > > >>> import > > >>> > > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > > >>> import > > >>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > >>> import > > >>> > > > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; > > >>> import org.apache.flink.streaming.api.windowing.time.Time; > > >>> import > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > > >>> import > > >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > > >>> import org.apache.flink.table.api.EnvironmentSettings; > > >>> import org.apache.flink.table.api.SqlDialect; > > >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > > >>> import org.apache.flink.table.catalog.hive.HiveCatalog; > > >>> import org.apache.flink.types.Row; > > >>> import org.apache.flink.types.RowKind; > > >>> > > >>> import java.time.Duration; > > >>> import java.time.Instant; > > >>> import java.time.LocalDateTime; > > >>> import java.time.ZoneId; > > >>> import java.time.format.DateTimeFormatter; > > >>> import java.util.Properties; > > >>> > > >>> public class MysqlCDC2Hive { > > >>> > > >>> private static final DateTimeFormatter dtf = > > >>> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); > > >>> > > >>> public static void main(String[] args) throws Exception { > > >>> StreamExecutionEnvironment streamEnv = > > >>> StreamExecutionEnvironment.getExecutionEnvironment(); > > >>> > > >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > >>> streamEnv.setParallelism(3); > > >>> streamEnv.enableCheckpointing(60000); > > >>> > > >>> EnvironmentSettings tableEnvSettings = > > >>> EnvironmentSettings.newInstance() > > >>> .useBlinkPlanner() > > >>> .inStreamingMode() > > >>> .build(); > > >>> StreamTableEnvironment tableEnv = > > >>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > > >>> > > >>> > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > > >>> CheckpointingMode.EXACTLY_ONCE); > > >>> > > >>> > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > > >>> Duration.ofMinutes(1)); > > >>> > > >>> String catalogName = "hive_catalog"; > > >>> HiveCatalog catalog = new HiveCatalog( > > >>> catalogName, > > >>> "default", > > >>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > > >>> "2.3.4" > > >>> ); > > >>> tableEnv.registerCatalog(catalogName, catalog); > > >>> tableEnv.useCatalog(catalogName); > > >>> > > >>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > > >>> tableEnv.registerFunction("my_date_format", myDateFormat); > > >>> > > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > > >>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > > >>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > > >>> " team_id INT,\n" + > > >>> " team_name STRING,\n" + > > >>> " create_time TIMESTAMP,\n" + > > >>> " update_time TIMESTAMP,\n" + > > >>> " proctime as proctime()\n" + > > >>> ") WITH (\n" + > > >>> " 'connector' = 'mysql-cdc',\n" + > > >>> " 'hostname' = 'localhost',\n" + > > >>> " 'port' = '3306',\n" + > > >>> " 'username' = 'root',\n" + > > >>> " 'password' = 'root',\n" + > > >>> " 'database-name' = 'test',\n" + > > >>> " 'table-name' = 'team'\n" + > > >>> ")"); > > >>> > > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > > >>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > > >>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > > >>> " team_id INT,\n" + > > >>> " team_name STRING,\n" + > > >>> " create_time TIMESTAMP,\n" + > > >>> " update_time TIMESTAMP\n" + > > >>> ") WITH (\n" + > > >>> " 'connector' = 'kafka',\n" + > > >>> " 'topic' = 'team',\n" + > > >>> " 'scan.startup.mode' = 'earliest-offset',\n" + > > >>> " 'properties.bootstrap.servers' = > > 'localhost:9092',\n" > > >>> + > > >>> " 'format' = 'changelog-json'\n" + > > >>> ")"); > > >>> > > >>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > > >>> "SELECT team_id, team_name, create_time, update_time > > \n" > > >>> + > > >>> "FROM cdc.team"); > > >>> > > >>> // 定义带op字段的stream > > >>> Properties properties = new Properties(); > > >>> properties.setProperty("bootstrap.servers", > "localhost:9092"); > > >>> properties.setProperty("group.id", "test1`"); > > >>> > > >>> FlinkKafkaConsumerBase<String> consumer = new > > >>> FlinkKafkaConsumer<>( > > >>> "team", > > >>> new SimpleStringSchema(), > > >>> properties > > >>> ).setStartFromEarliest(); > > >>> > > >>> DataStream<String> ds = streamEnv.addSource(consumer); > > >>> > > >>> String[] fieldNames = {"team_id", "team_name", "create_time", > > >>> "update_time", "op"}; > > >>> TypeInformation[] types = {Types.INT, Types.STRING, > > >>> Types.STRING, Types.STRING, Types.STRING}; > > >>> DataStream<Row> ds2 = ds.map(str -> { > > >>> JSONObject jsonObject = JSON.parseObject(str); > > >>> String op = jsonObject.getString("op"); > > >>> JSONObject data = jsonObject.getJSONObject("data"); > > >>> int arity = fieldNames.length; > > >>> Row row = new Row(arity); > > >>> row.setField(0, data.get("team_id")); > > >>> row.setField(1, data.get("team_name")); > > >>> row.setField(2, data.get("create_time")); > > >>> row.setField(3, data.get("update_time")); > > >>> String operation = getOperation(op); > > >>> row.setField(4, operation); > > >>> > > >>> return row; > > >>> }, new RowTypeInfo(types, fieldNames)) > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> *.assignTimestampsAndWatermarks(new > > >>> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { > > >>> @Override public long extractTimestamp(Row row) { > > >>> String dt = (String) row.getField(2); LocalDateTime > ldt > > = > > >>> LocalDateTime.parse(dt, dtf); Instant instant = > > >>> ldt.atZone(ZoneId.systemDefault()).toInstant(); long > > >>> timeInMillis = instant.toEpochMilli(); return > > timeInMillis; > > >>> } });* > > >>> > > >>> tableEnv.registerDataStream("merged_team", ds2); > > >>> > > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > >>> > > >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > > >>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > > >>> > > >>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > > >>> " team_id INT,\n" + > > >>> " team_name STRING,\n" + > > >>> " create_time STRING,\n" + > > >>> " update_time STRING,\n" + > > >>> " op STRING\n" + > > >>> ") PARTITIONED BY (\n" + > > >>> " dt STRING,\n" + > > >>> " hr STRING,\n" + > > >>> " mi STRING\n" + > > >>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > > >>> " 'sink.partition-commit.trigger' = > > >>> 'partition-time',\n" + > > >>> " 'sink.partition-commit.delay' = '1 min',\n" + > > >>> " 'sink.partition-commit.policy.kind' = > > >>> 'metastore,success-file',\n" + > > >>> " 'partition.time-extractor.timestamp-pattern' = > '$dt > > >>> $hr:$mi:00'\n" + > > >>> ")"); > > >>> > > >>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > >>> tableEnv.executeSql("INSERT INTO ods.team \n" + > > >>> "SELECT team_id, team_name, create_time, update_time, > > >>> op, \n" + > > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > > >>> HH:mm:ss'), 'yyyyMMdd') as dt, \n" + > > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > > >>> HH:mm:ss'), 'HH') as hr, \n" + > > >>> " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > > >>> HH:mm:ss'), 'mm') as mi \n" + > > >>> "FROM merged_team"); > > >>> tableEnv.execute("MysqlCDC2Hive2"); > > >>> > > >>> streamEnv.execute(""); > > >>> } > > >>> > > >>> private static String getOperation(String op) { > > >>> String operation = "INSERT"; > > >>> for (RowKind rk : RowKind.values()) { > > >>> if (rk.shortString().equals(op)) { > > >>> switch (rk) { > > >>> case UPDATE_BEFORE: > > >>> case UPDATE_AFTER: > > >>> operation = "UPDATE"; > > >>> break; > > >>> case DELETE: > > >>> operation = "DELETE"; > > >>> break; > > >>> case INSERT: > > >>> default: > > >>> operation = "INSERT"; > > >>> break; > > >>> } > > >>> break; > > >>> } > > >>> } > > >>> return operation; > > >>> } > > >>> } > > >>> > > >>> Jark Wu <imj...@gmail.com> 于2020年11月1日周日 上午11:04写道: > > >>> > > >>>> 你检查一下 hive 文件是否正常生成了? > > >>>> > > >>>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger > > >>>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 > > >>>> > > >>>> Best, > > >>>> Jark > > >>>> > > >>>> > > >>>> [1]: > > >>>> > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger > > >>>> > > >>>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <casel.c...@gmail.com> wrote: > > >>>> > > >>>>> > > > 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive > > >>>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? > > >>>>> > > >>>>> cdc -> kafka示例消息如下 > > >>>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 > > >>>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} > > >>>>> > > >>>>> import com.alibaba.fastjson.JSON; > > >>>>> import com.alibaba.fastjson.JSONObject; > > >>>>> import > org.apache.flink.api.common.serialization.SimpleStringSchema; > > >>>>> import org.apache.flink.api.common.typeinfo.TypeInformation; > > >>>>> import org.apache.flink.api.common.typeinfo.Types; > > >>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; > > >>>>> import org.apache.flink.streaming.api.CheckpointingMode; > > >>>>> import org.apache.flink.streaming.api.TimeCharacteristic; > > >>>>> import org.apache.flink.streaming.api.datastream.DataStream; > > >>>>> import > > >>>>> > > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > > >>>>> import > > >>>>> > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > >>>>> import > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > > >>>>> import > > >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > > >>>>> import org.apache.flink.table.api.EnvironmentSettings; > > >>>>> import org.apache.flink.table.api.SqlDialect; > > >>>>> import > org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > > >>>>> import org.apache.flink.table.catalog.hive.HiveCatalog; > > >>>>> import org.apache.flink.types.Row; > > >>>>> import org.apache.flink.types.RowKind; > > >>>>> > > >>>>> import java.time.Duration; > > >>>>> import java.util.Properties; > > >>>>> > > >>>>> public class MysqlCDC2Hive { > > >>>>> public static void main(String[] args) throws Exception { > > >>>>> StreamExecutionEnvironment streamEnv = > > >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); > > >>>>> > > >>>>> > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > >>>>> streamEnv.setParallelism(3); > > >>>>> streamEnv.enableCheckpointing(60000); > > >>>>> > > >>>>> EnvironmentSettings tableEnvSettings = > > >>>>> EnvironmentSettings.newInstance() > > >>>>> .useBlinkPlanner() > > >>>>> .inStreamingMode() > > >>>>> .build(); > > >>>>> StreamTableEnvironment tableEnv = > > >>>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); > > >>>>> > > >>>>> > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > > >>>>> CheckpointingMode.EXACTLY_ONCE); > > >>>>> > > >>>>> > > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > > >>>>> Duration.ofMinutes(1)); > > >>>>> > > >>>>> String catalogName = "hive_catalog"; > > >>>>> HiveCatalog catalog = new HiveCatalog( > > >>>>> catalogName, > > >>>>> "default", > > >>>>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > > >>>>> "2.3.4" > > >>>>> ); > > >>>>> tableEnv.registerCatalog(catalogName, catalog); > > >>>>> tableEnv.useCatalog(catalogName); > > >>>>> > > >>>>> MyDateFormat2 myDateFormat = new MyDateFormat2(); > > >>>>> tableEnv.registerFunction("my_date_format", myDateFormat); > > >>>>> > > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > > >>>>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > > >>>>> " team_id INT,\n" + > > >>>>> " team_name STRING,\n" + > > >>>>> " create_time TIMESTAMP,\n" + > > >>>>> " update_time TIMESTAMP,\n" + > > >>>>> " proctime as proctime()\n" + > > >>>>> ") WITH (\n" + > > >>>>> " 'connector' = 'mysql-cdc',\n" + > > >>>>> " 'hostname' = 'localhost',\n" + > > >>>>> " 'port' = '3306',\n" + > > >>>>> " 'username' = 'root',\n" + > > >>>>> " 'password' = 'root',\n" + > > >>>>> " 'database-name' = 'test',\n" + > > >>>>> " 'table-name' = 'team'\n" + > > >>>>> ")"); > > >>>>> > > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > > >>>>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > > >>>>> " team_id INT,\n" + > > >>>>> " team_name STRING,\n" + > > >>>>> " create_time TIMESTAMP,\n" + > > >>>>> " update_time TIMESTAMP\n" + > > >>>>> ") WITH (\n" + > > >>>>> " 'connector' = 'kafka',\n" + > > >>>>> " 'topic' = 'team',\n" + > > >>>>> " 'scan.startup.mode' = 'earliest-offset',\n" + > > >>>>> " 'properties.bootstrap.servers' = > > >>>>> 'localhost:9092',\n" + > > >>>>> " 'format' = 'changelog-json'\n" + > > >>>>> ")"); > > >>>>> > > >>>>> tableEnv.executeSql("INSERT INTO kafka.team \n" + > > >>>>> "SELECT team_id, team_name, create_time, > update_time > > >>>>> \n" + > > >>>>> "FROM cdc.team"); > > >>>>> > > >>>>> // 定义带op字段的stream > > >>>>> Properties properties = new Properties(); > > >>>>> properties.setProperty("bootstrap.servers", > > "localhost:9092"); > > >>>>> properties.setProperty("group.id", "test"); > > >>>>> > > >>>>> FlinkKafkaConsumerBase<String> consumer = new > > >>>>> FlinkKafkaConsumer<>( > > >>>>> "team", > > >>>>> new SimpleStringSchema(), > > >>>>> properties > > >>>>> ).setStartFromEarliest(); > > >>>>> > > >>>>> DataStream<String> ds = streamEnv.addSource(consumer); > > >>>>> > > >>>>> String[] fieldNames = {"team_id", "team_name", > "create_time", > > >>>>> "update_time", "op"}; > > >>>>> TypeInformation[] types = {Types.INT, Types.STRING, > > >>>>> Types.STRING, Types.STRING, Types.STRING}; > > >>>>> DataStream<Row> ds2 = ds.map(str -> { > > >>>>> JSONObject jsonObject = JSON.parseObject(str); > > >>>>> String op = jsonObject.getString("op"); > > >>>>> JSONObject data = jsonObject.getJSONObject("data"); > > >>>>> int arity = fieldNames.length; > > >>>>> Row row = new Row(arity); > > >>>>> row.setField(0, data.get("team_id")); > > >>>>> row.setField(1, data.get("team_name")); > > >>>>> row.setField(2, data.get("create_time")); > > >>>>> row.setField(3, data.get("update_time")); > > >>>>> String operation = getOperation(op); > > >>>>> row.setField(4, operation); > > >>>>> > > >>>>> return row; > > >>>>> }, new RowTypeInfo(types, fieldNames)); > > >>>>> > > >>>>> tableEnv.registerDataStream("merged_team", ds2); > > >>>>> > > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > >>>>> > > >>>>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > > >>>>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > > >>>>> > > >>>>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + > > >>>>> " team_id INT,\n" + > > >>>>> " team_name STRING,\n" + > > >>>>> " create_time STRING,\n" + > > >>>>> " update_time STRING,\n" + > > >>>>> " op STRING\n" + > > >>>>> // ") PARTITIONED BY (\n" + > > >>>>> // " ts_date STRING,\n" + > > >>>>> // " ts_hour STRING,\n" + > > >>>>> // " ts_minute STRING\n" + > > >>>>> ") STORED AS PARQUET TBLPROPERTIES (\n" + > > >>>>> " 'sink.partition-commit.trigger' = > > >>>>> 'partition-time',\n" + > > >>>>> " 'sink.partition-commit.delay' = '1 min',\n" + > > >>>>> " 'sink.partition-commit.policy.kind' = > > >>>>> 'metastore,success-file',\n" + > > >>>>> " 'partition.time-extractor.timestamp-pattern' = > > >>>>> '$ts_date $ts_hour:$ts_minute:00'\n" + > > >>>>> ")"); > > >>>>> > > >>>>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > > >>>>> tableEnv.executeSql("INSERT INTO ods.team \n" + > > >>>>> "SELECT team_id, team_name, create_time, > update_time, > > >>>>> op \n" + > > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, > 'yyyy-MM-dd > > >>>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + > > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, > 'yyyy-MM-dd > > >>>>> HH:mm:ss'), 'HH') as ts_hour, \n" + > > >>>>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, > 'yyyy-MM-dd > > >>>>> HH:mm:ss'), 'mm') as ts_minute \n" + > > >>>>> "FROM merged_team"); > > >>>>> tableEnv.execute("MysqlCDC2Hive2"); > > >>>>> > > >>>>> streamEnv.execute(""); > > >>>>> } > > >>>>> > > >>>>> private static String getOperation(String op) { > > >>>>> String operation = "INSERT"; > > >>>>> for (RowKind rk : RowKind.values()) { > > >>>>> if (rk.shortString().equals(op)) { > > >>>>> switch (rk) { > > >>>>> case UPDATE_BEFORE: > > >>>>> case UPDATE_AFTER: > > >>>>> operation = "UPDATE"; > > >>>>> break; > > >>>>> case DELETE: > > >>>>> operation = "DELETE"; > > >>>>> break; > > >>>>> case INSERT: > > >>>>> default: > > >>>>> operation = "INSERT"; > > >>>>> break; > > >>>>> } > > >>>>> break; > > >>>>> } > > >>>>> } > > >>>>> return operation; > > >>>>> } > > >>>>> } > > >>>>> > > >>>>> Jark Wu <imj...@gmail.com> 于2020年10月31日周六 下午1:45写道: > > >>>>> > > >>>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 > hive > > >>>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 > > >>>>>> > > >>>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive > streaming > > >>>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive > > >>>>>> 中进行合并。merge过程可以参考这篇文章[1]。 > > >>>>>> > > >>>>>> 3. 你可以 ts + INTERVAL '8' HOUR > > >>>>>> > > >>>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive > > >>>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 > > >>>>>> > > >>>>>> Best, > > >>>>>> Jark > > >>>>>> > > >>>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <15927482...@163.com> wrote: > > >>>>>> > > >>>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 > > >>>>>>> > > >>>>>>> > > >>>>>>> | | > > >>>>>>> 罗显宴 > > >>>>>>> | > > >>>>>>> | > > >>>>>>> 邮箱:15927482...@163.com > > >>>>>>> | > > >>>>>>> > > >>>>>>> 签名由 网易邮箱大师 定制 > > >>>>>>> > > >>>>>>> 在2020年10月31日 12:06,陈帅 写道: > > >>>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 > > >>>>>>> > > >>>>>>> Exception in thread "main" > > org.apache.flink.table.api.TableException: > > >>>>>>> AppendStreamTableSink doesn't support consuming update and delete > > >>>>>>> changes > > >>>>>>> which is produced by node TableSourceScan(table=[[hive_catalog, > > cdc, > > >>>>>>> team]], fields=[team_id, team_name, create_time, update_time]) > > >>>>>>> > > >>>>>>> 我的问题: > > >>>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? > > >>>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> > > >>>>>>> kafka,然后kafka > > >>>>>>> -> hive streaming? 谢谢! > > >>>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? > > >>>>>>> > > >>>>>>> sql语句如下 > > >>>>>>> > > >>>>>>> CREATE DATABASE IF NOT EXISTS cdc > > >>>>>>> > > >>>>>>> DROP TABLE IF EXISTS cdc.team > > >>>>>>> > > >>>>>>> CREATE TABLE team( > > >>>>>>> team_id BIGINT, > > >>>>>>> team_name STRING, > > >>>>>>> create_time TIMESTAMP, > > >>>>>>> update_time TIMESTAMP, > > >>>>>>> proctime as proctime() > > >>>>>>> ) WITH ( > > >>>>>>> 'connector' = 'mysql-cdc', > > >>>>>>> 'hostname' = 'localhost', > > >>>>>>> 'port' = '3306', > > >>>>>>> 'username' = 'root', > > >>>>>>> 'password' = 'root', > > >>>>>>> 'database-name' = 'test', > > >>>>>>> 'table-name' = 'team' > > >>>>>>> ) > > >>>>>>> > > >>>>>>> CREATE DATABASE IF NOT EXISTS ods > > >>>>>>> > > >>>>>>> DROP TABLE IF EXISTS ods.team > > >>>>>>> > > >>>>>>> CREATE TABLE ods.team ( > > >>>>>>> team_id BIGINT, > > >>>>>>> team_name STRING, > > >>>>>>> create_time TIMESTAMP, > > >>>>>>> update_time TIMESTAMP, > > >>>>>>> ) PARTITIONED BY ( > > >>>>>>> ts_date STRING, > > >>>>>>> ts_hour STRING, > > >>>>>>> ts_minute STRING, > > >>>>>>> ) STORED AS PARQUET TBLPROPERTIES ( > > >>>>>>> 'sink.partition-commit.trigger' = 'partition-time', > > >>>>>>> 'sink.partition-commit.delay' = '1 min', > > >>>>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', > > >>>>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date > > >>>>>>> $ts_hour:$ts_minute:00' > > >>>>>>> ) > > >>>>>>> > > >>>>>>> INSERT INTO ods.team > > >>>>>>> SELECT team_id, team_name, create_time, update_time, > > >>>>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), > > >>>>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), > > >>>>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') > > >>>>>>> FROM cdc.team > > >>>>>>> > > >>>>>> > > > > > -- > Best regards! > Rui Li > -- Best, Jingsong Lee