- 你可以用 proc-time
- 或者在你的Source上添加 **UTC时区的Watermark**,注意是 **UTC**,SQL的watermark都是 **UTC**的

On Mon, Nov 2, 2020 at 10:38 AM Rui Li <lirui.fu...@gmail.com> wrote:

> Hi,
>
> 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success
> file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下
>
> LOG.info("Partition {} of table {} is ready to be committed",
> partSpec, tableIdentifier);
>
> LOG.info("Committed partition {} to metastore", partitionSpec);
>
> LOG.info("Committed partition {} with success file",
> context.partitionSpec());
>
>
> On Sun, Nov 1, 2020 at 5:36 PM 陈帅 <casel.c...@gmail.com> wrote:
>
> > 最后,在hive shell中执行 “msck repair table team;”  命令后就能查询到写的数据了,难道flink hive
> > streaming不能自动注册hive分区吗?还是我使用的姿势不对?
> >
> > 陈帅 <casel.c...@gmail.com> 于2020年11月1日周日 下午5:24写道:
> >
> > > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
> > > ") STORED AS TEXTFILE TBLPROPERTIES ("
> > >
> > > 这是生成的hive表建表语句
> > >
> > > hive> show create table team;
> > > OK
> > > CREATE TABLE `team`(
> > >   `team_id` int,
> > >   `team_name` string,
> > >   `create_time` string,
> > >   `update_time` string,
> > >   `op` string)
> > > PARTITIONED BY (
> > >   `dt` string,
> > >   `hr` string,
> > >   `mi` string)
> > > ROW FORMAT SERDE
> > >   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
> > > STORED AS INPUTFORMAT
> > >   'org.apache.hadoop.mapred.TextInputFormat'
> > > OUTPUTFORMAT
> > >   'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
> > > LOCATION
> > >   'hdfs://localhost:9000/user/hive/warehouse/ods.db/team'
> > > TBLPROPERTIES (
> > >   'is_generic'='false',
> > >   'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
> > >   'sink.partition-commit.delay'='1 min',
> > >   'sink.partition-commit.policy.kind'='metastore,success-file',
> > >   'sink.partition-commit.trigger'='partition-time',
> > >   'transient_lastDdlTime'='1604222266')
> > > Time taken: 0.252 seconds, Fetched: 25 row(s)
> > >
> > > 另外,下载了hive文件内容如下
> > > 1001<0x01>Sun<0x01>2020-10-31 11:25:38<0x01>2020-10-31
> > 11:25:38<0x01>INSERT
> > >
> > > 还是查询不到结果
> > > hive> select * from team;
> > > OK
> > > Time taken: 0.326 seconds
> > >
> > > 陈帅 <casel.c...@gmail.com> 于2020年11月1日周日 下午5:10写道:
> > >
> > >>
> > >>
> >
> 之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
> > >> 生成的hive分区文件路径类似于
> > /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
> > >> part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
> > >>
> > >> 陈帅 <casel.c...@gmail.com> 于2020年11月1日周日 下午4:43写道:
> > >>
> > >>>
> >
> 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
> > >>> shell查不到数据。
> > >>>
> > >>> import com.alibaba.fastjson.JSON;
> > >>> import com.alibaba.fastjson.JSONObject;
> > >>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> > >>> import org.apache.flink.api.common.typeinfo.TypeInformation;
> > >>> import org.apache.flink.api.common.typeinfo.Types;
> > >>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> > >>> import org.apache.flink.streaming.api.CheckpointingMode;
> > >>> import org.apache.flink.streaming.api.TimeCharacteristic;
> > >>> import org.apache.flink.streaming.api.datastream.DataStream;
> > >>> import
> > >>>
> > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
> > >>> import
> > >>>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > >>> import
> > >>>
> >
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> > >>> import org.apache.flink.streaming.api.windowing.time.Time;
> > >>> import
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> > >>> import
> > >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
> > >>> import org.apache.flink.table.api.EnvironmentSettings;
> > >>> import org.apache.flink.table.api.SqlDialect;
> > >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> > >>> import org.apache.flink.table.catalog.hive.HiveCatalog;
> > >>> import org.apache.flink.types.Row;
> > >>> import org.apache.flink.types.RowKind;
> > >>>
> > >>> import java.time.Duration;
> > >>> import java.time.Instant;
> > >>> import java.time.LocalDateTime;
> > >>> import java.time.ZoneId;
> > >>> import java.time.format.DateTimeFormatter;
> > >>> import java.util.Properties;
> > >>>
> > >>> public class MysqlCDC2Hive {
> > >>>
> > >>>     private static final DateTimeFormatter dtf =
> > >>> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
> > >>>
> > >>>     public static void main(String[] args) throws Exception {
> > >>>         StreamExecutionEnvironment streamEnv =
> > >>> StreamExecutionEnvironment.getExecutionEnvironment();
> > >>>
> > >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >>>         streamEnv.setParallelism(3);
> > >>>         streamEnv.enableCheckpointing(60000);
> > >>>
> > >>>         EnvironmentSettings tableEnvSettings =
> > >>> EnvironmentSettings.newInstance()
> > >>>                 .useBlinkPlanner()
> > >>>                 .inStreamingMode()
> > >>>                 .build();
> > >>>         StreamTableEnvironment tableEnv =
> > >>> StreamTableEnvironment.create(streamEnv, tableEnvSettings);
> > >>>
> > >>>
> >
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> > >>> CheckpointingMode.EXACTLY_ONCE);
> > >>>
> > >>>
> >
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> > >>> Duration.ofMinutes(1));
> > >>>
> > >>>         String catalogName = "hive_catalog";
> > >>>         HiveCatalog catalog = new HiveCatalog(
> > >>>                 catalogName,
> > >>>                 "default",
> > >>>                 "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf",
> > >>>                 "2.3.4"
> > >>>         );
> > >>>         tableEnv.registerCatalog(catalogName, catalog);
> > >>>         tableEnv.useCatalog(catalogName);
> > >>>
> > >>>         MyDateFormat2 myDateFormat = new MyDateFormat2();
> > >>>         tableEnv.registerFunction("my_date_format", myDateFormat);
> > >>>
> > >>>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
> > >>>         tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team");
> > >>>         tableEnv.executeSql("CREATE TABLE cdc.team(\n" +
> > >>>                 "    team_id INT,\n" +
> > >>>                 "    team_name STRING,\n" +
> > >>>                 "    create_time TIMESTAMP,\n" +
> > >>>                 "    update_time TIMESTAMP,\n" +
> > >>>                 "    proctime as proctime()\n" +
> > >>>                 ") WITH (\n" +
> > >>>                 "  'connector' = 'mysql-cdc',\n" +
> > >>>                 "  'hostname' = 'localhost',\n" +
> > >>>                 "  'port' = '3306',\n" +
> > >>>                 "  'username' = 'root',\n" +
> > >>>                 "  'password' = 'root',\n" +
> > >>>                 "  'database-name' = 'test',\n" +
> > >>>                 "  'table-name' = 'team'\n" +
> > >>>                 ")");
> > >>>
> > >>>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
> > >>>         tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team");
> > >>>         tableEnv.executeSql("CREATE TABLE kafka.team (\n" +
> > >>>                 "  team_id INT,\n" +
> > >>>                 "  team_name STRING,\n" +
> > >>>                 "  create_time TIMESTAMP,\n" +
> > >>>                 "  update_time TIMESTAMP\n" +
> > >>>                 ") WITH (\n" +
> > >>>                 "  'connector' = 'kafka',\n" +
> > >>>                 "  'topic' = 'team',\n" +
> > >>>                 "  'scan.startup.mode' = 'earliest-offset',\n" +
> > >>>                 "  'properties.bootstrap.servers' =
> > 'localhost:9092',\n"
> > >>> +
> > >>>                 "  'format' = 'changelog-json'\n" +
> > >>>                 ")");
> > >>>
> > >>>         tableEnv.executeSql("INSERT INTO kafka.team \n" +
> > >>>                 "SELECT team_id, team_name, create_time, update_time
> > \n"
> > >>> +
> > >>>                 "FROM cdc.team");
> > >>>
> > >>>         // 定义带op字段的stream
> > >>>         Properties properties = new Properties();
> > >>>         properties.setProperty("bootstrap.servers",
> "localhost:9092");
> > >>>         properties.setProperty("group.id", "test1`");
> > >>>
> > >>>         FlinkKafkaConsumerBase<String> consumer = new
> > >>> FlinkKafkaConsumer<>(
> > >>>                 "team",
> > >>>                 new SimpleStringSchema(),
> > >>>                 properties
> > >>>         ).setStartFromEarliest();
> > >>>
> > >>>         DataStream<String> ds = streamEnv.addSource(consumer);
> > >>>
> > >>>         String[] fieldNames = {"team_id", "team_name", "create_time",
> > >>> "update_time", "op"};
> > >>>         TypeInformation[] types = {Types.INT, Types.STRING,
> > >>> Types.STRING, Types.STRING, Types.STRING};
> > >>>         DataStream<Row> ds2 = ds.map(str -> {
> > >>>             JSONObject jsonObject = JSON.parseObject(str);
> > >>>             String op = jsonObject.getString("op");
> > >>>             JSONObject data = jsonObject.getJSONObject("data");
> > >>>             int arity = fieldNames.length;
> > >>>             Row row = new Row(arity);
> > >>>             row.setField(0, data.get("team_id"));
> > >>>             row.setField(1, data.get("team_name"));
> > >>>             row.setField(2, data.get("create_time"));
> > >>>             row.setField(3, data.get("update_time"));
> > >>>             String operation = getOperation(op);
> > >>>             row.setField(4, operation);
> > >>>
> > >>>             return row;
> > >>>         }, new RowTypeInfo(types, fieldNames))
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> *.assignTimestampsAndWatermarks(new
> > >>> BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) {
> > >>> @Override            public long extractTimestamp(Row row) {
> > >>> String dt = (String) row.getField(2);                LocalDateTime
> ldt
> > =
> > >>> LocalDateTime.parse(dt, dtf);                Instant instant =
> > >>> ldt.atZone(ZoneId.systemDefault()).toInstant();                long
> > >>> timeInMillis = instant.toEpochMilli();                return
> > timeInMillis;
> > >>>           }        });*
> > >>>
> > >>>         tableEnv.registerDataStream("merged_team", ds2);
> > >>>
> > >>>         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >>>
> > >>>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
> > >>>         tableEnv.executeSql("DROP TABLE IF EXISTS ods.team");
> > >>>
> > >>>         tableEnv.executeSql("CREATE TABLE ods.team (\n" +
> > >>>                 "  team_id INT,\n" +
> > >>>                 "  team_name STRING,\n" +
> > >>>                 "  create_time STRING,\n" +
> > >>>                 "  update_time STRING,\n" +
> > >>>                 "  op STRING\n" +
> > >>>                 ") PARTITIONED BY (\n" +
> > >>>                 "    dt STRING,\n" +
> > >>>                 "    hr STRING,\n" +
> > >>>                 "    mi STRING\n" +
> > >>>                 ") STORED AS PARQUET TBLPROPERTIES (\n" +
> > >>>                 "  'sink.partition-commit.trigger' =
> > >>> 'partition-time',\n" +
> > >>>                 "  'sink.partition-commit.delay' = '1 min',\n" +
> > >>>                 "  'sink.partition-commit.policy.kind' =
> > >>> 'metastore,success-file',\n" +
> > >>>                 "  'partition.time-extractor.timestamp-pattern' =
> '$dt
> > >>> $hr:$mi:00'\n" +
> > >>>                 ")");
> > >>>
> > >>>         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >>>         tableEnv.executeSql("INSERT INTO ods.team \n" +
> > >>>                 "SELECT team_id, team_name, create_time, update_time,
> > >>> op, \n" +
> > >>>                 " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
> > >>> HH:mm:ss'), 'yyyyMMdd') as dt, \n" +
> > >>>                 " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
> > >>> HH:mm:ss'), 'HH') as hr, \n" +
> > >>>                 " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
> > >>> HH:mm:ss'), 'mm') as mi \n" +
> > >>>                 "FROM merged_team");
> > >>>         tableEnv.execute("MysqlCDC2Hive2");
> > >>>
> > >>>         streamEnv.execute("");
> > >>>     }
> > >>>
> > >>>     private static String getOperation(String op) {
> > >>>         String operation = "INSERT";
> > >>>         for (RowKind rk : RowKind.values()) {
> > >>>             if (rk.shortString().equals(op)) {
> > >>>                 switch (rk) {
> > >>>                     case UPDATE_BEFORE:
> > >>>                     case UPDATE_AFTER:
> > >>>                         operation = "UPDATE";
> > >>>                         break;
> > >>>                     case DELETE:
> > >>>                         operation = "DELETE";
> > >>>                         break;
> > >>>                     case INSERT:
> > >>>                     default:
> > >>>                         operation = "INSERT";
> > >>>                         break;
> > >>>                 }
> > >>>                 break;
> > >>>             }
> > >>>         }
> > >>>         return operation;
> > >>>     }
> > >>> }
> > >>>
> > >>> Jark Wu <imj...@gmail.com> 于2020年11月1日周日 上午11:04写道:
> > >>>
> > >>>> 你检查一下 hive 文件是否正常生成了?
> > >>>>
> > >>>> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger
> > >>>> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。
> > >>>>
> > >>>> Best,
> > >>>> Jark
> > >>>>
> > >>>>
> > >>>> [1]:
> > >>>>
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger
> > >>>>
> > >>>> On Sat, 31 Oct 2020 at 17:25, 陈帅 <casel.c...@gmail.com> wrote:
> > >>>>
> > >>>>>
> >
> 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive
> > >>>>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗?
> > >>>>>
> > >>>>> cdc -> kafka示例消息如下
> > >>>>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31
> > >>>>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"}
> > >>>>>
> > >>>>> import com.alibaba.fastjson.JSON;
> > >>>>> import com.alibaba.fastjson.JSONObject;
> > >>>>> import
> org.apache.flink.api.common.serialization.SimpleStringSchema;
> > >>>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
> > >>>>> import org.apache.flink.api.common.typeinfo.Types;
> > >>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> > >>>>> import org.apache.flink.streaming.api.CheckpointingMode;
> > >>>>> import org.apache.flink.streaming.api.TimeCharacteristic;
> > >>>>> import org.apache.flink.streaming.api.datastream.DataStream;
> > >>>>> import
> > >>>>>
> > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
> > >>>>> import
> > >>>>>
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > >>>>> import
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> > >>>>> import
> > >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
> > >>>>> import org.apache.flink.table.api.EnvironmentSettings;
> > >>>>> import org.apache.flink.table.api.SqlDialect;
> > >>>>> import
> org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> > >>>>> import org.apache.flink.table.catalog.hive.HiveCatalog;
> > >>>>> import org.apache.flink.types.Row;
> > >>>>> import org.apache.flink.types.RowKind;
> > >>>>>
> > >>>>> import java.time.Duration;
> > >>>>> import java.util.Properties;
> > >>>>>
> > >>>>> public class MysqlCDC2Hive {
> > >>>>>     public static void main(String[] args) throws Exception {
> > >>>>>         StreamExecutionEnvironment streamEnv =
> > >>>>> StreamExecutionEnvironment.getExecutionEnvironment();
> > >>>>>
> > >>>>>
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >>>>>         streamEnv.setParallelism(3);
> > >>>>>         streamEnv.enableCheckpointing(60000);
> > >>>>>
> > >>>>>         EnvironmentSettings tableEnvSettings =
> > >>>>> EnvironmentSettings.newInstance()
> > >>>>>                 .useBlinkPlanner()
> > >>>>>                 .inStreamingMode()
> > >>>>>                 .build();
> > >>>>>         StreamTableEnvironment tableEnv =
> > >>>>> StreamTableEnvironment.create(streamEnv, tableEnvSettings);
> > >>>>>
> > >>>>>
> >
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> > >>>>> CheckpointingMode.EXACTLY_ONCE);
> > >>>>>
> > >>>>>
> >
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> > >>>>> Duration.ofMinutes(1));
> > >>>>>
> > >>>>>         String catalogName = "hive_catalog";
> > >>>>>         HiveCatalog catalog = new HiveCatalog(
> > >>>>>                 catalogName,
> > >>>>>                 "default",
> > >>>>>                 "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf",
> > >>>>>                 "2.3.4"
> > >>>>>         );
> > >>>>>         tableEnv.registerCatalog(catalogName, catalog);
> > >>>>>         tableEnv.useCatalog(catalogName);
> > >>>>>
> > >>>>>         MyDateFormat2 myDateFormat = new MyDateFormat2();
> > >>>>>         tableEnv.registerFunction("my_date_format", myDateFormat);
> > >>>>>
> > >>>>>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
> > >>>>>         tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team");
> > >>>>>         tableEnv.executeSql("CREATE TABLE cdc.team(\n" +
> > >>>>>                 "    team_id INT,\n" +
> > >>>>>                 "    team_name STRING,\n" +
> > >>>>>                 "    create_time TIMESTAMP,\n" +
> > >>>>>                 "    update_time TIMESTAMP,\n" +
> > >>>>>                 "    proctime as proctime()\n" +
> > >>>>>                 ") WITH (\n" +
> > >>>>>                 "  'connector' = 'mysql-cdc',\n" +
> > >>>>>                 "  'hostname' = 'localhost',\n" +
> > >>>>>                 "  'port' = '3306',\n" +
> > >>>>>                 "  'username' = 'root',\n" +
> > >>>>>                 "  'password' = 'root',\n" +
> > >>>>>                 "  'database-name' = 'test',\n" +
> > >>>>>                 "  'table-name' = 'team'\n" +
> > >>>>>                 ")");
> > >>>>>
> > >>>>>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
> > >>>>>         tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team");
> > >>>>>         tableEnv.executeSql("CREATE TABLE kafka.team (\n" +
> > >>>>>                 "  team_id INT,\n" +
> > >>>>>                 "  team_name STRING,\n" +
> > >>>>>                 "  create_time TIMESTAMP,\n" +
> > >>>>>                 "  update_time TIMESTAMP\n" +
> > >>>>>                 ") WITH (\n" +
> > >>>>>                 "  'connector' = 'kafka',\n" +
> > >>>>>                 "  'topic' = 'team',\n" +
> > >>>>>                 "  'scan.startup.mode' = 'earliest-offset',\n" +
> > >>>>>                 "  'properties.bootstrap.servers' =
> > >>>>> 'localhost:9092',\n" +
> > >>>>>                 "  'format' = 'changelog-json'\n" +
> > >>>>>                 ")");
> > >>>>>
> > >>>>>         tableEnv.executeSql("INSERT INTO kafka.team \n" +
> > >>>>>                 "SELECT team_id, team_name, create_time,
> update_time
> > >>>>> \n" +
> > >>>>>                 "FROM cdc.team");
> > >>>>>
> > >>>>>         // 定义带op字段的stream
> > >>>>>         Properties properties = new Properties();
> > >>>>>         properties.setProperty("bootstrap.servers",
> > "localhost:9092");
> > >>>>>         properties.setProperty("group.id", "test");
> > >>>>>
> > >>>>>         FlinkKafkaConsumerBase<String> consumer = new
> > >>>>> FlinkKafkaConsumer<>(
> > >>>>>                 "team",
> > >>>>>                 new SimpleStringSchema(),
> > >>>>>                 properties
> > >>>>>         ).setStartFromEarliest();
> > >>>>>
> > >>>>>         DataStream<String> ds = streamEnv.addSource(consumer);
> > >>>>>
> > >>>>>         String[] fieldNames = {"team_id", "team_name",
> "create_time",
> > >>>>> "update_time", "op"};
> > >>>>>         TypeInformation[] types = {Types.INT, Types.STRING,
> > >>>>> Types.STRING, Types.STRING, Types.STRING};
> > >>>>>         DataStream<Row> ds2 = ds.map(str -> {
> > >>>>>             JSONObject jsonObject = JSON.parseObject(str);
> > >>>>>             String op = jsonObject.getString("op");
> > >>>>>             JSONObject data = jsonObject.getJSONObject("data");
> > >>>>>             int arity = fieldNames.length;
> > >>>>>             Row row = new Row(arity);
> > >>>>>             row.setField(0, data.get("team_id"));
> > >>>>>             row.setField(1, data.get("team_name"));
> > >>>>>             row.setField(2, data.get("create_time"));
> > >>>>>             row.setField(3, data.get("update_time"));
> > >>>>>             String operation = getOperation(op);
> > >>>>>             row.setField(4, operation);
> > >>>>>
> > >>>>>             return row;
> > >>>>>         }, new RowTypeInfo(types, fieldNames));
> > >>>>>
> > >>>>>         tableEnv.registerDataStream("merged_team", ds2);
> > >>>>>
> > >>>>>         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> > >>>>>
> > >>>>>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
> > >>>>>         tableEnv.executeSql("DROP TABLE IF EXISTS ods.team");
> > >>>>>
> > >>>>>         tableEnv.executeSql("CREATE TABLE ods.team (\n" +
> > >>>>>                 "  team_id INT,\n" +
> > >>>>>                 "  team_name STRING,\n" +
> > >>>>>                 "  create_time STRING,\n" +
> > >>>>>                 "  update_time STRING,\n" +
> > >>>>>                 "  op STRING\n" +
> > >>>>> //                ") PARTITIONED BY (\n" +
> > >>>>> //                "    ts_date STRING,\n" +
> > >>>>> //                "    ts_hour STRING,\n" +
> > >>>>> //                "    ts_minute STRING\n" +
> > >>>>>                 ") STORED AS PARQUET TBLPROPERTIES (\n" +
> > >>>>>                 "  'sink.partition-commit.trigger' =
> > >>>>> 'partition-time',\n" +
> > >>>>>                 "  'sink.partition-commit.delay' = '1 min',\n" +
> > >>>>>                 "  'sink.partition-commit.policy.kind' =
> > >>>>> 'metastore,success-file',\n" +
> > >>>>>                 "  'partition.time-extractor.timestamp-pattern' =
> > >>>>> '$ts_date $ts_hour:$ts_minute:00'\n" +
> > >>>>>                 ")");
> > >>>>>
> > >>>>>         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
> > >>>>>         tableEnv.executeSql("INSERT INTO ods.team \n" +
> > >>>>>                 "SELECT team_id, team_name, create_time,
> update_time,
> > >>>>> op \n" +
> > >>>>> //                " DATE_FORMAT(TO_TIMESTAMP(create_time,
> 'yyyy-MM-dd
> > >>>>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" +
> > >>>>> //                " DATE_FORMAT(TO_TIMESTAMP(create_time,
> 'yyyy-MM-dd
> > >>>>> HH:mm:ss'), 'HH') as ts_hour, \n" +
> > >>>>> //                " DATE_FORMAT(TO_TIMESTAMP(create_time,
> 'yyyy-MM-dd
> > >>>>> HH:mm:ss'), 'mm') as ts_minute \n" +
> > >>>>>                 "FROM merged_team");
> > >>>>>         tableEnv.execute("MysqlCDC2Hive2");
> > >>>>>
> > >>>>>         streamEnv.execute("");
> > >>>>>     }
> > >>>>>
> > >>>>>     private static String getOperation(String op) {
> > >>>>>         String operation = "INSERT";
> > >>>>>         for (RowKind rk : RowKind.values()) {
> > >>>>>             if (rk.shortString().equals(op)) {
> > >>>>>                 switch (rk) {
> > >>>>>                     case UPDATE_BEFORE:
> > >>>>>                     case UPDATE_AFTER:
> > >>>>>                         operation = "UPDATE";
> > >>>>>                         break;
> > >>>>>                     case DELETE:
> > >>>>>                         operation = "DELETE";
> > >>>>>                         break;
> > >>>>>                     case INSERT:
> > >>>>>                     default:
> > >>>>>                         operation = "INSERT";
> > >>>>>                         break;
> > >>>>>                 }
> > >>>>>                 break;
> > >>>>>             }
> > >>>>>         }
> > >>>>>         return operation;
> > >>>>>     }
> > >>>>> }
> > >>>>>
> > >>>>> Jark Wu <imj...@gmail.com> 于2020年10月31日周六 下午1:45写道:
> > >>>>>
> > >>>>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是
> hive
> > >>>>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。
> > >>>>>>
> > >>>>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive
> streaming
> > >>>>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive
> > >>>>>> 中进行合并。merge过程可以参考这篇文章[1]。
> > >>>>>>
> > >>>>>> 3. 你可以 ts + INTERVAL '8' HOUR
> > >>>>>>
> > >>>>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive
> > >>>>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。
> > >>>>>>
> > >>>>>> Best,
> > >>>>>> Jark
> > >>>>>>
> > >>>>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <15927482...@163.com> wrote:
> > >>>>>>
> > >>>>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> | |
> > >>>>>>> 罗显宴
> > >>>>>>> |
> > >>>>>>> |
> > >>>>>>> 邮箱:15927482...@163.com
> > >>>>>>> |
> > >>>>>>>
> > >>>>>>> 签名由 网易邮箱大师 定制
> > >>>>>>>
> > >>>>>>> 在2020年10月31日 12:06,陈帅 写道:
> > >>>>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛
> > >>>>>>>
> > >>>>>>> Exception in thread "main"
> > org.apache.flink.table.api.TableException:
> > >>>>>>> AppendStreamTableSink doesn't support consuming update and delete
> > >>>>>>> changes
> > >>>>>>> which is produced by node TableSourceScan(table=[[hive_catalog,
> > cdc,
> > >>>>>>> team]], fields=[team_id, team_name, create_time, update_time])
> > >>>>>>>
> > >>>>>>> 我的问题:
> > >>>>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢?
> > >>>>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc ->
> > >>>>>>> kafka,然后kafka
> > >>>>>>> -> hive streaming? 谢谢!
> > >>>>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么?
> > >>>>>>>
> > >>>>>>> sql语句如下
> > >>>>>>>
> > >>>>>>> CREATE DATABASE IF NOT EXISTS cdc
> > >>>>>>>
> > >>>>>>> DROP TABLE IF EXISTS cdc.team
> > >>>>>>>
> > >>>>>>> CREATE TABLE team(
> > >>>>>>>    team_id BIGINT,
> > >>>>>>>    team_name STRING,
> > >>>>>>>    create_time TIMESTAMP,
> > >>>>>>>    update_time TIMESTAMP,
> > >>>>>>> proctime as proctime()
> > >>>>>>> ) WITH (
> > >>>>>>>  'connector' = 'mysql-cdc',
> > >>>>>>>  'hostname' = 'localhost',
> > >>>>>>>  'port' = '3306',
> > >>>>>>>  'username' = 'root',
> > >>>>>>>  'password' = 'root',
> > >>>>>>>  'database-name' = 'test',
> > >>>>>>>  'table-name' = 'team'
> > >>>>>>> )
> > >>>>>>>
> > >>>>>>> CREATE DATABASE IF NOT EXISTS ods
> > >>>>>>>
> > >>>>>>> DROP TABLE IF EXISTS ods.team
> > >>>>>>>
> > >>>>>>> CREATE TABLE ods.team (
> > >>>>>>>  team_id BIGINT,
> > >>>>>>>  team_name STRING,
> > >>>>>>>  create_time TIMESTAMP,
> > >>>>>>>  update_time TIMESTAMP,
> > >>>>>>> ) PARTITIONED BY (
> > >>>>>>>  ts_date STRING,
> > >>>>>>>  ts_hour STRING,
> > >>>>>>>  ts_minute STRING,
> > >>>>>>> ) STORED AS PARQUET TBLPROPERTIES (
> > >>>>>>>  'sink.partition-commit.trigger' = 'partition-time',
> > >>>>>>>  'sink.partition-commit.delay' = '1 min',
> > >>>>>>>  'sink.partition-commit.policy.kind' = 'metastore,success-file',
> > >>>>>>>  'partition.time-extractor.timestamp-pattern' = '$ts_date
> > >>>>>>> $ts_hour:$ts_minute:00'
> > >>>>>>> )
> > >>>>>>>
> > >>>>>>> INSERT INTO ods.team
> > >>>>>>> SELECT team_id, team_name, create_time, update_time,
> > >>>>>>>  my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'),
> > >>>>>>>  my_date_format(create_time,'HH', 'Asia/Shanghai'),
> > >>>>>>>  my_date_format(create_time,'mm', 'Asia/Shanghai')
> > >>>>>>> FROM cdc.team
> > >>>>>>>
> > >>>>>>
> >
>
>
> --
> Best regards!
> Rui Li
>


-- 
Best, Jingsong Lee

回复