你检查一下 hive 文件是否正常生成了?

我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger  policy
是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。

Best,
Jark


[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger

On Sat, 31 Oct 2020 at 17:25, 陈帅 <[email protected]> wrote:

> 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive
> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗?
>
> cdc -> kafka示例消息如下
> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31
> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"}
>
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONObject;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.SqlDialect;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.catalog.hive.HiveCatalog;
> import org.apache.flink.types.Row;
> import org.apache.flink.types.RowKind;
>
> import java.time.Duration;
> import java.util.Properties;
>
> public class MysqlCDC2Hive {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         streamEnv.setParallelism(3);
>         streamEnv.enableCheckpointing(60000);
>
>         EnvironmentSettings tableEnvSettings =
> EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, tableEnvSettings);
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE);
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofMinutes(1));
>
>         String catalogName = "hive_catalog";
>         HiveCatalog catalog = new HiveCatalog(
>                 catalogName,
>                 "default",
>                 "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf",
>                 "2.3.4"
>         );
>         tableEnv.registerCatalog(catalogName, catalog);
>         tableEnv.useCatalog(catalogName);
>
>         MyDateFormat2 myDateFormat = new MyDateFormat2();
>         tableEnv.registerFunction("my_date_format", myDateFormat);
>
>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
>         tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team");
>         tableEnv.executeSql("CREATE TABLE cdc.team(\n" +
>                 "    team_id INT,\n" +
>                 "    team_name STRING,\n" +
>                 "    create_time TIMESTAMP,\n" +
>                 "    update_time TIMESTAMP,\n" +
>                 "    proctime as proctime()\n" +
>                 ") WITH (\n" +
>                 "  'connector' = 'mysql-cdc',\n" +
>                 "  'hostname' = 'localhost',\n" +
>                 "  'port' = '3306',\n" +
>                 "  'username' = 'root',\n" +
>                 "  'password' = 'root',\n" +
>                 "  'database-name' = 'test',\n" +
>                 "  'table-name' = 'team'\n" +
>                 ")");
>
>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
>         tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team");
>         tableEnv.executeSql("CREATE TABLE kafka.team (\n" +
>                 "  team_id INT,\n" +
>                 "  team_name STRING,\n" +
>                 "  create_time TIMESTAMP,\n" +
>                 "  update_time TIMESTAMP\n" +
>                 ") WITH (\n" +
>                 "  'connector' = 'kafka',\n" +
>                 "  'topic' = 'team',\n" +
>                 "  'scan.startup.mode' = 'earliest-offset',\n" +
>                 "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
>                 "  'format' = 'changelog-json'\n" +
>                 ")");
>
>         tableEnv.executeSql("INSERT INTO kafka.team \n" +
>                 "SELECT team_id, team_name, create_time, update_time \n" +
>                 "FROM cdc.team");
>
>         // 定义带op字段的stream
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>         properties.setProperty("group.id", "test");
>
>         FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>(
>                 "team",
>                 new SimpleStringSchema(),
>                 properties
>         ).setStartFromEarliest();
>
>         DataStream<String> ds = streamEnv.addSource(consumer);
>
>         String[] fieldNames = {"team_id", "team_name", "create_time",
> "update_time", "op"};
>         TypeInformation[] types = {Types.INT, Types.STRING, Types.STRING,
> Types.STRING, Types.STRING};
>         DataStream<Row> ds2 = ds.map(str -> {
>             JSONObject jsonObject = JSON.parseObject(str);
>             String op = jsonObject.getString("op");
>             JSONObject data = jsonObject.getJSONObject("data");
>             int arity = fieldNames.length;
>             Row row = new Row(arity);
>             row.setField(0, data.get("team_id"));
>             row.setField(1, data.get("team_name"));
>             row.setField(2, data.get("create_time"));
>             row.setField(3, data.get("update_time"));
>             String operation = getOperation(op);
>             row.setField(4, operation);
>
>             return row;
>         }, new RowTypeInfo(types, fieldNames));
>
>         tableEnv.registerDataStream("merged_team", ds2);
>
>         tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
>         tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
>         tableEnv.executeSql("DROP TABLE IF EXISTS ods.team");
>
>         tableEnv.executeSql("CREATE TABLE ods.team (\n" +
>                 "  team_id INT,\n" +
>                 "  team_name STRING,\n" +
>                 "  create_time STRING,\n" +
>                 "  update_time STRING,\n" +
>                 "  op STRING\n" +
> //                ") PARTITIONED BY (\n" +
> //                "    ts_date STRING,\n" +
> //                "    ts_hour STRING,\n" +
> //                "    ts_minute STRING\n" +
>                 ") STORED AS PARQUET TBLPROPERTIES (\n" +
>                 "  'sink.partition-commit.trigger' = 'partition-time',\n" +
>                 "  'sink.partition-commit.delay' = '1 min',\n" +
>                 "  'sink.partition-commit.policy.kind' =
> 'metastore,success-file',\n" +
>                 "  'partition.time-extractor.timestamp-pattern' =
> '$ts_date $ts_hour:$ts_minute:00'\n" +
>                 ")");
>
>         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>         tableEnv.executeSql("INSERT INTO ods.team \n" +
>                 "SELECT team_id, team_name, create_time, update_time, op
> \n" +
> //                " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" +
> //                " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
> HH:mm:ss'), 'HH') as ts_hour, \n" +
> //                " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
> HH:mm:ss'), 'mm') as ts_minute \n" +
>                 "FROM merged_team");
>         tableEnv.execute("MysqlCDC2Hive2");
>
>         streamEnv.execute("");
>     }
>
>     private static String getOperation(String op) {
>         String operation = "INSERT";
>         for (RowKind rk : RowKind.values()) {
>             if (rk.shortString().equals(op)) {
>                 switch (rk) {
>                     case UPDATE_BEFORE:
>                     case UPDATE_AFTER:
>                         operation = "UPDATE";
>                         break;
>                     case DELETE:
>                         operation = "DELETE";
>                         break;
>                     case INSERT:
>                     default:
>                         operation = "INSERT";
>                         break;
>                 }
>                 break;
>             }
>         }
>         return operation;
>     }
> }
>
> Jark Wu <[email protected]> 于2020年10月31日周六 下午1:45写道:
>
>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是  hive
>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。
>>
>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming
>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive
>> 中进行合并。merge过程可以参考这篇文章[1]。
>>
>> 3. 你可以 ts + INTERVAL '8' HOUR
>>
>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive
>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。
>>
>> Best,
>> Jark
>>
>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[email protected]> wrote:
>>
>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。
>>>
>>>
>>> | |
>>> 罗显宴
>>> |
>>> |
>>> 邮箱:[email protected]
>>> |
>>>
>>> 签名由 网易邮箱大师 定制
>>>
>>> 在2020年10月31日 12:06,陈帅 写道:
>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛
>>>
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> AppendStreamTableSink doesn't support consuming update and delete changes
>>> which is produced by node TableSourceScan(table=[[hive_catalog, cdc,
>>> team]], fields=[team_id, team_name, create_time, update_time])
>>>
>>> 我的问题:
>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢?
>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> kafka,然后kafka
>>> -> hive streaming? 谢谢!
>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么?
>>>
>>> sql语句如下
>>>
>>> CREATE DATABASE IF NOT EXISTS cdc
>>>
>>> DROP TABLE IF EXISTS cdc.team
>>>
>>> CREATE TABLE team(
>>>    team_id BIGINT,
>>>    team_name STRING,
>>>    create_time TIMESTAMP,
>>>    update_time TIMESTAMP,
>>> proctime as proctime()
>>> ) WITH (
>>>  'connector' = 'mysql-cdc',
>>>  'hostname' = 'localhost',
>>>  'port' = '3306',
>>>  'username' = 'root',
>>>  'password' = 'root',
>>>  'database-name' = 'test',
>>>  'table-name' = 'team'
>>> )
>>>
>>> CREATE DATABASE IF NOT EXISTS ods
>>>
>>> DROP TABLE IF EXISTS ods.team
>>>
>>> CREATE TABLE ods.team (
>>>  team_id BIGINT,
>>>  team_name STRING,
>>>  create_time TIMESTAMP,
>>>  update_time TIMESTAMP,
>>> ) PARTITIONED BY (
>>>  ts_date STRING,
>>>  ts_hour STRING,
>>>  ts_minute STRING,
>>> ) STORED AS PARQUET TBLPROPERTIES (
>>>  'sink.partition-commit.trigger' = 'partition-time',
>>>  'sink.partition-commit.delay' = '1 min',
>>>  'sink.partition-commit.policy.kind' = 'metastore,success-file',
>>>  'partition.time-extractor.timestamp-pattern' = '$ts_date
>>> $ts_hour:$ts_minute:00'
>>> )
>>>
>>> INSERT INTO ods.team
>>> SELECT team_id, team_name, create_time, update_time,
>>>  my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'),
>>>  my_date_format(create_time,'HH', 'Asia/Shanghai'),
>>>  my_date_format(create_time,'mm', 'Asia/Shanghai')
>>> FROM cdc.team
>>>
>>

回复