之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
陈帅 <[email protected]> 于2020年11月1日周日 下午4:43写道: > 我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive > shell查不到数据。 > > import com.alibaba.fastjson.JSON; > import com.alibaba.fastjson.JSONObject; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.SqlDialect; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.catalog.hive.HiveCatalog; > import org.apache.flink.types.Row; > import org.apache.flink.types.RowKind; > > import java.time.Duration; > import java.time.Instant; > import java.time.LocalDateTime; > import java.time.ZoneId; > import java.time.format.DateTimeFormatter; > import java.util.Properties; > > public class MysqlCDC2Hive { > > private static final DateTimeFormatter dtf = > DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > streamEnv.setParallelism(3); > streamEnv.enableCheckpointing(60000); > > EnvironmentSettings tableEnvSettings = > EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(streamEnv, tableEnvSettings); > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE); > > tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > Duration.ofMinutes(1)); > > String catalogName = "hive_catalog"; > HiveCatalog catalog = new HiveCatalog( > catalogName, > "default", > "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", > "2.3.4" > ); > tableEnv.registerCatalog(catalogName, catalog); > tableEnv.useCatalog(catalogName); > > MyDateFormat2 myDateFormat = new MyDateFormat2(); > tableEnv.registerFunction("my_date_format", myDateFormat); > > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); > tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); > tableEnv.executeSql("CREATE TABLE cdc.team(\n" + > " team_id INT,\n" + > " team_name STRING,\n" + > " create_time TIMESTAMP,\n" + > " update_time TIMESTAMP,\n" + > " proctime as proctime()\n" + > ") WITH (\n" + > " 'connector' = 'mysql-cdc',\n" + > " 'hostname' = 'localhost',\n" + > " 'port' = '3306',\n" + > " 'username' = 'root',\n" + > " 'password' = 'root',\n" + > " 'database-name' = 'test',\n" + > " 'table-name' = 'team'\n" + > ")"); > > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); > tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); > tableEnv.executeSql("CREATE TABLE kafka.team (\n" + > " team_id INT,\n" + > " team_name STRING,\n" + > " create_time TIMESTAMP,\n" + > " update_time TIMESTAMP\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'team',\n" + > " 'scan.startup.mode' = 'earliest-offset',\n" + > " 'properties.bootstrap.servers' = 'localhost:9092',\n" + > " 'format' = 'changelog-json'\n" + > ")"); > > tableEnv.executeSql("INSERT INTO kafka.team \n" + > "SELECT team_id, team_name, create_time, update_time \n" + > "FROM cdc.team"); > > // 定义带op字段的stream > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("group.id", "test1`"); > > FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>( > "team", > new SimpleStringSchema(), > properties > ).setStartFromEarliest(); > > DataStream<String> ds = streamEnv.addSource(consumer); > > String[] fieldNames = {"team_id", "team_name", "create_time", > "update_time", "op"}; > TypeInformation[] types = {Types.INT, Types.STRING, Types.STRING, > Types.STRING, Types.STRING}; > DataStream<Row> ds2 = ds.map(str -> { > JSONObject jsonObject = JSON.parseObject(str); > String op = jsonObject.getString("op"); > JSONObject data = jsonObject.getJSONObject("data"); > int arity = fieldNames.length; > Row row = new Row(arity); > row.setField(0, data.get("team_id")); > row.setField(1, data.get("team_name")); > row.setField(2, data.get("create_time")); > row.setField(3, data.get("update_time")); > String operation = getOperation(op); > row.setField(4, operation); > > return row; > }, new RowTypeInfo(types, fieldNames)) > > > > > > > > > *.assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) { > @Override public long extractTimestamp(Row row) { > String dt = (String) row.getField(2); LocalDateTime ldt = > LocalDateTime.parse(dt, dtf); Instant instant = > ldt.atZone(ZoneId.systemDefault()).toInstant(); long > timeInMillis = instant.toEpochMilli(); return timeInMillis; > } });* > > tableEnv.registerDataStream("merged_team", ds2); > > tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); > tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); > > tableEnv.executeSql("CREATE TABLE ods.team (\n" + > " team_id INT,\n" + > " team_name STRING,\n" + > " create_time STRING,\n" + > " update_time STRING,\n" + > " op STRING\n" + > ") PARTITIONED BY (\n" + > " dt STRING,\n" + > " hr STRING,\n" + > " mi STRING\n" + > ") STORED AS PARQUET TBLPROPERTIES (\n" + > " 'sink.partition-commit.trigger' = 'partition-time',\n" + > " 'sink.partition-commit.delay' = '1 min',\n" + > " 'sink.partition-commit.policy.kind' = > 'metastore,success-file',\n" + > " 'partition.time-extractor.timestamp-pattern' = '$dt > $hr:$mi:00'\n" + > ")"); > > tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > tableEnv.executeSql("INSERT INTO ods.team \n" + > "SELECT team_id, team_name, create_time, update_time, op, > \n" + > " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > HH:mm:ss'), 'yyyyMMdd') as dt, \n" + > " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > HH:mm:ss'), 'HH') as hr, \n" + > " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd > HH:mm:ss'), 'mm') as mi \n" + > "FROM merged_team"); > tableEnv.execute("MysqlCDC2Hive2"); > > streamEnv.execute(""); > } > > private static String getOperation(String op) { > String operation = "INSERT"; > for (RowKind rk : RowKind.values()) { > if (rk.shortString().equals(op)) { > switch (rk) { > case UPDATE_BEFORE: > case UPDATE_AFTER: > operation = "UPDATE"; > break; > case DELETE: > operation = "DELETE"; > break; > case INSERT: > default: > operation = "INSERT"; > break; > } > break; > } > } > return operation; > } > } > > Jark Wu <[email protected]> 于2020年11月1日周日 上午11:04写道: > >> 你检查一下 hive 文件是否正常生成了? >> >> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger >> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 >> >> Best, >> Jark >> >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger >> >> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[email protected]> wrote: >> >>> 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive >>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? >>> >>> cdc -> kafka示例消息如下 >>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 >>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} >>> >>> import com.alibaba.fastjson.JSON; >>> import com.alibaba.fastjson.JSONObject; >>> import org.apache.flink.api.common.serialization.SimpleStringSchema; >>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>> import org.apache.flink.api.common.typeinfo.Types; >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>> import org.apache.flink.streaming.api.CheckpointingMode; >>> import org.apache.flink.streaming.api.TimeCharacteristic; >>> import org.apache.flink.streaming.api.datastream.DataStream; >>> import >>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >>> import >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; >>> import org.apache.flink.table.api.EnvironmentSettings; >>> import org.apache.flink.table.api.SqlDialect; >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >>> import org.apache.flink.table.catalog.hive.HiveCatalog; >>> import org.apache.flink.types.Row; >>> import org.apache.flink.types.RowKind; >>> >>> import java.time.Duration; >>> import java.util.Properties; >>> >>> public class MysqlCDC2Hive { >>> public static void main(String[] args) throws Exception { >>> StreamExecutionEnvironment streamEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> streamEnv.setParallelism(3); >>> streamEnv.enableCheckpointing(60000); >>> >>> EnvironmentSettings tableEnvSettings = >>> EnvironmentSettings.newInstance() >>> .useBlinkPlanner() >>> .inStreamingMode() >>> .build(); >>> StreamTableEnvironment tableEnv = >>> StreamTableEnvironment.create(streamEnv, tableEnvSettings); >>> >>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >>> CheckpointingMode.EXACTLY_ONCE); >>> >>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >>> Duration.ofMinutes(1)); >>> >>> String catalogName = "hive_catalog"; >>> HiveCatalog catalog = new HiveCatalog( >>> catalogName, >>> "default", >>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf", >>> "2.3.4" >>> ); >>> tableEnv.registerCatalog(catalogName, catalog); >>> tableEnv.useCatalog(catalogName); >>> >>> MyDateFormat2 myDateFormat = new MyDateFormat2(); >>> tableEnv.registerFunction("my_date_format", myDateFormat); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team"); >>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time TIMESTAMP,\n" + >>> " update_time TIMESTAMP,\n" + >>> " proctime as proctime()\n" + >>> ") WITH (\n" + >>> " 'connector' = 'mysql-cdc',\n" + >>> " 'hostname' = 'localhost',\n" + >>> " 'port' = '3306',\n" + >>> " 'username' = 'root',\n" + >>> " 'password' = 'root',\n" + >>> " 'database-name' = 'test',\n" + >>> " 'table-name' = 'team'\n" + >>> ")"); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team"); >>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time TIMESTAMP,\n" + >>> " update_time TIMESTAMP\n" + >>> ") WITH (\n" + >>> " 'connector' = 'kafka',\n" + >>> " 'topic' = 'team',\n" + >>> " 'scan.startup.mode' = 'earliest-offset',\n" + >>> " 'properties.bootstrap.servers' = 'localhost:9092',\n" >>> + >>> " 'format' = 'changelog-json'\n" + >>> ")"); >>> >>> tableEnv.executeSql("INSERT INTO kafka.team \n" + >>> "SELECT team_id, team_name, create_time, update_time \n" >>> + >>> "FROM cdc.team"); >>> >>> // 定义带op字段的stream >>> Properties properties = new Properties(); >>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>> properties.setProperty("group.id", "test"); >>> >>> FlinkKafkaConsumerBase<String> consumer = new >>> FlinkKafkaConsumer<>( >>> "team", >>> new SimpleStringSchema(), >>> properties >>> ).setStartFromEarliest(); >>> >>> DataStream<String> ds = streamEnv.addSource(consumer); >>> >>> String[] fieldNames = {"team_id", "team_name", "create_time", >>> "update_time", "op"}; >>> TypeInformation[] types = {Types.INT, Types.STRING, >>> Types.STRING, Types.STRING, Types.STRING}; >>> DataStream<Row> ds2 = ds.map(str -> { >>> JSONObject jsonObject = JSON.parseObject(str); >>> String op = jsonObject.getString("op"); >>> JSONObject data = jsonObject.getJSONObject("data"); >>> int arity = fieldNames.length; >>> Row row = new Row(arity); >>> row.setField(0, data.get("team_id")); >>> row.setField(1, data.get("team_name")); >>> row.setField(2, data.get("create_time")); >>> row.setField(3, data.get("update_time")); >>> String operation = getOperation(op); >>> row.setField(4, operation); >>> >>> return row; >>> }, new RowTypeInfo(types, fieldNames)); >>> >>> tableEnv.registerDataStream("merged_team", ds2); >>> >>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); >>> >>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods"); >>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team"); >>> >>> tableEnv.executeSql("CREATE TABLE ods.team (\n" + >>> " team_id INT,\n" + >>> " team_name STRING,\n" + >>> " create_time STRING,\n" + >>> " update_time STRING,\n" + >>> " op STRING\n" + >>> // ") PARTITIONED BY (\n" + >>> // " ts_date STRING,\n" + >>> // " ts_hour STRING,\n" + >>> // " ts_minute STRING\n" + >>> ") STORED AS PARQUET TBLPROPERTIES (\n" + >>> " 'sink.partition-commit.trigger' = >>> 'partition-time',\n" + >>> " 'sink.partition-commit.delay' = '1 min',\n" + >>> " 'sink.partition-commit.policy.kind' = >>> 'metastore,success-file',\n" + >>> " 'partition.time-extractor.timestamp-pattern' = >>> '$ts_date $ts_hour:$ts_minute:00'\n" + >>> ")"); >>> >>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); >>> tableEnv.executeSql("INSERT INTO ods.team \n" + >>> "SELECT team_id, team_name, create_time, update_time, op >>> \n" + >>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" + >>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'HH') as ts_hour, \n" + >>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd >>> HH:mm:ss'), 'mm') as ts_minute \n" + >>> "FROM merged_team"); >>> tableEnv.execute("MysqlCDC2Hive2"); >>> >>> streamEnv.execute(""); >>> } >>> >>> private static String getOperation(String op) { >>> String operation = "INSERT"; >>> for (RowKind rk : RowKind.values()) { >>> if (rk.shortString().equals(op)) { >>> switch (rk) { >>> case UPDATE_BEFORE: >>> case UPDATE_AFTER: >>> operation = "UPDATE"; >>> break; >>> case DELETE: >>> operation = "DELETE"; >>> break; >>> case INSERT: >>> default: >>> operation = "INSERT"; >>> break; >>> } >>> break; >>> } >>> } >>> return operation; >>> } >>> } >>> >>> Jark Wu <[email protected]> 于2020年10月31日周六 下午1:45写道: >>> >>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive >>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 >>>> >>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming >>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive >>>> 中进行合并。merge过程可以参考这篇文章[1]。 >>>> >>>> 3. 你可以 ts + INTERVAL '8' HOUR >>>> >>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive >>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。 >>>> >>>> Best, >>>> Jark >>>> >>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[email protected]> wrote: >>>> >>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 >>>>> >>>>> >>>>> | | >>>>> 罗显宴 >>>>> | >>>>> | >>>>> 邮箱:[email protected] >>>>> | >>>>> >>>>> 签名由 网易邮箱大师 定制 >>>>> >>>>> 在2020年10月31日 12:06,陈帅 写道: >>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 >>>>> >>>>> Exception in thread "main" org.apache.flink.table.api.TableException: >>>>> AppendStreamTableSink doesn't support consuming update and delete >>>>> changes >>>>> which is produced by node TableSourceScan(table=[[hive_catalog, cdc, >>>>> team]], fields=[team_id, team_name, create_time, update_time]) >>>>> >>>>> 我的问题: >>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢? >>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc -> >>>>> kafka,然后kafka >>>>> -> hive streaming? 谢谢! >>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么? >>>>> >>>>> sql语句如下 >>>>> >>>>> CREATE DATABASE IF NOT EXISTS cdc >>>>> >>>>> DROP TABLE IF EXISTS cdc.team >>>>> >>>>> CREATE TABLE team( >>>>> team_id BIGINT, >>>>> team_name STRING, >>>>> create_time TIMESTAMP, >>>>> update_time TIMESTAMP, >>>>> proctime as proctime() >>>>> ) WITH ( >>>>> 'connector' = 'mysql-cdc', >>>>> 'hostname' = 'localhost', >>>>> 'port' = '3306', >>>>> 'username' = 'root', >>>>> 'password' = 'root', >>>>> 'database-name' = 'test', >>>>> 'table-name' = 'team' >>>>> ) >>>>> >>>>> CREATE DATABASE IF NOT EXISTS ods >>>>> >>>>> DROP TABLE IF EXISTS ods.team >>>>> >>>>> CREATE TABLE ods.team ( >>>>> team_id BIGINT, >>>>> team_name STRING, >>>>> create_time TIMESTAMP, >>>>> update_time TIMESTAMP, >>>>> ) PARTITIONED BY ( >>>>> ts_date STRING, >>>>> ts_hour STRING, >>>>> ts_minute STRING, >>>>> ) STORED AS PARQUET TBLPROPERTIES ( >>>>> 'sink.partition-commit.trigger' = 'partition-time', >>>>> 'sink.partition-commit.delay' = '1 min', >>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file', >>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date >>>>> $ts_hour:$ts_minute:00' >>>>> ) >>>>> >>>>> INSERT INTO ods.team >>>>> SELECT team_id, team_name, create_time, update_time, >>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'), >>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'), >>>>> my_date_format(create_time,'mm', 'Asia/Shanghai') >>>>> FROM cdc.team >>>>> >>>>
