我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
shell查不到数据。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
public class MysqlCDC2Hive {
private static final DateTimeFormatter dtf =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
streamEnv.setParallelism(3);
streamEnv.enableCheckpointing(60000);
EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(streamEnv, tableEnvSettings);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(1));
String catalogName = "hive_catalog";
HiveCatalog catalog = new HiveCatalog(
catalogName,
"default",
"/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf",
"2.3.4"
);
tableEnv.registerCatalog(catalogName, catalog);
tableEnv.useCatalog(catalogName);
MyDateFormat2 myDateFormat = new MyDateFormat2();
tableEnv.registerFunction("my_date_format", myDateFormat);
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team");
tableEnv.executeSql("CREATE TABLE cdc.team(\n" +
" team_id INT,\n" +
" team_name STRING,\n" +
" create_time TIMESTAMP,\n" +
" update_time TIMESTAMP,\n" +
" proctime as proctime()\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'database-name' = 'test',\n" +
" 'table-name' = 'team'\n" +
")");
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team");
tableEnv.executeSql("CREATE TABLE kafka.team (\n" +
" team_id INT,\n" +
" team_name STRING,\n" +
" create_time TIMESTAMP,\n" +
" update_time TIMESTAMP\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'team',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'format' = 'changelog-json'\n" +
")");
tableEnv.executeSql("INSERT INTO kafka.team \n" +
"SELECT team_id, team_name, create_time, update_time \n" +
"FROM cdc.team");
// 定义带op字段的stream
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test1`");
FlinkKafkaConsumerBase<String> consumer = new FlinkKafkaConsumer<>(
"team",
new SimpleStringSchema(),
properties
).setStartFromEarliest();
DataStream<String> ds = streamEnv.addSource(consumer);
String[] fieldNames = {"team_id", "team_name", "create_time",
"update_time", "op"};
TypeInformation[] types = {Types.INT, Types.STRING, Types.STRING,
Types.STRING, Types.STRING};
DataStream<Row> ds2 = ds.map(str -> {
JSONObject jsonObject = JSON.parseObject(str);
String op = jsonObject.getString("op");
JSONObject data = jsonObject.getJSONObject("data");
int arity = fieldNames.length;
Row row = new Row(arity);
row.setField(0, data.get("team_id"));
row.setField(1, data.get("team_name"));
row.setField(2, data.get("create_time"));
row.setField(3, data.get("update_time"));
String operation = getOperation(op);
row.setField(4, operation);
return row;
}, new RowTypeInfo(types, fieldNames))
*.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Row>(Time.minutes(1)) {
@Override public long extractTimestamp(Row row) {
String dt = (String) row.getField(2); LocalDateTime ldt =
LocalDateTime.parse(dt, dtf); Instant instant =
ldt.atZone(ZoneId.systemDefault()).toInstant(); long
timeInMillis = instant.toEpochMilli(); return timeInMillis;
} });*
tableEnv.registerDataStream("merged_team", ds2);
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
tableEnv.executeSql("DROP TABLE IF EXISTS ods.team");
tableEnv.executeSql("CREATE TABLE ods.team (\n" +
" team_id INT,\n" +
" team_name STRING,\n" +
" create_time STRING,\n" +
" update_time STRING,\n" +
" op STRING\n" +
") PARTITIONED BY (\n" +
" dt STRING,\n" +
" hr STRING,\n" +
" mi STRING\n" +
") STORED AS PARQUET TBLPROPERTIES (\n" +
" 'sink.partition-commit.trigger' = 'partition-time',\n" +
" 'sink.partition-commit.delay' = '1 min',\n" +
" 'sink.partition-commit.policy.kind' =
'metastore,success-file',\n" +
" 'partition.time-extractor.timestamp-pattern' = '$dt
$hr:$mi:00'\n" +
")");
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tableEnv.executeSql("INSERT INTO ods.team \n" +
"SELECT team_id, team_name, create_time, update_time, op,
\n" +
" DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
HH:mm:ss'), 'yyyyMMdd') as dt, \n" +
" DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
HH:mm:ss'), 'HH') as hr, \n" +
" DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
HH:mm:ss'), 'mm') as mi \n" +
"FROM merged_team");
tableEnv.execute("MysqlCDC2Hive2");
streamEnv.execute("");
}
private static String getOperation(String op) {
String operation = "INSERT";
for (RowKind rk : RowKind.values()) {
if (rk.shortString().equals(op)) {
switch (rk) {
case UPDATE_BEFORE:
case UPDATE_AFTER:
operation = "UPDATE";
break;
case DELETE:
operation = "DELETE";
break;
case INSERT:
default:
operation = "INSERT";
break;
}
break;
}
}
return operation;
}
}
Jark Wu <[email protected]> 于2020年11月1日周日 上午11:04写道:
> 你检查一下 hive 文件是否正常生成了?
>
> 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger
> policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。
>
> Best,
> Jark
>
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger
>
> On Sat, 31 Oct 2020 at 17:25, 陈帅 <[email protected]> wrote:
>
>> 谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive
>> shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗?
>>
>> cdc -> kafka示例消息如下
>> {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31
>> 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"}
>>
>> import com.alibaba.fastjson.JSON;
>> import com.alibaba.fastjson.JSONObject;
>> import org.apache.flink.api.common.serialization.SimpleStringSchema;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.Types;
>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> import org.apache.flink.streaming.api.CheckpointingMode;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import
>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
>> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.SqlDialect;
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>> import org.apache.flink.types.Row;
>> import org.apache.flink.types.RowKind;
>>
>> import java.time.Duration;
>> import java.util.Properties;
>>
>> public class MysqlCDC2Hive {
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment streamEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> streamEnv.setParallelism(3);
>> streamEnv.enableCheckpointing(60000);
>>
>> EnvironmentSettings tableEnvSettings =
>> EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> .build();
>> StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(streamEnv, tableEnvSettings);
>>
>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>> CheckpointingMode.EXACTLY_ONCE);
>>
>> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>> Duration.ofMinutes(1));
>>
>> String catalogName = "hive_catalog";
>> HiveCatalog catalog = new HiveCatalog(
>> catalogName,
>> "default",
>> "/Users/chenshuai/dev/apache-hive-2.3.4-bin/conf",
>> "2.3.4"
>> );
>> tableEnv.registerCatalog(catalogName, catalog);
>> tableEnv.useCatalog(catalogName);
>>
>> MyDateFormat2 myDateFormat = new MyDateFormat2();
>> tableEnv.registerFunction("my_date_format", myDateFormat);
>>
>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS cdc");
>> tableEnv.executeSql("DROP TABLE IF EXISTS cdc.team");
>> tableEnv.executeSql("CREATE TABLE cdc.team(\n" +
>> " team_id INT,\n" +
>> " team_name STRING,\n" +
>> " create_time TIMESTAMP,\n" +
>> " update_time TIMESTAMP,\n" +
>> " proctime as proctime()\n" +
>> ") WITH (\n" +
>> " 'connector' = 'mysql-cdc',\n" +
>> " 'hostname' = 'localhost',\n" +
>> " 'port' = '3306',\n" +
>> " 'username' = 'root',\n" +
>> " 'password' = 'root',\n" +
>> " 'database-name' = 'test',\n" +
>> " 'table-name' = 'team'\n" +
>> ")");
>>
>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS kafka");
>> tableEnv.executeSql("DROP TABLE IF EXISTS kafka.team");
>> tableEnv.executeSql("CREATE TABLE kafka.team (\n" +
>> " team_id INT,\n" +
>> " team_name STRING,\n" +
>> " create_time TIMESTAMP,\n" +
>> " update_time TIMESTAMP\n" +
>> ") WITH (\n" +
>> " 'connector' = 'kafka',\n" +
>> " 'topic' = 'team',\n" +
>> " 'scan.startup.mode' = 'earliest-offset',\n" +
>> " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
>> " 'format' = 'changelog-json'\n" +
>> ")");
>>
>> tableEnv.executeSql("INSERT INTO kafka.team \n" +
>> "SELECT team_id, team_name, create_time, update_time \n" +
>> "FROM cdc.team");
>>
>> // 定义带op字段的stream
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "localhost:9092");
>> properties.setProperty("group.id", "test");
>>
>> FlinkKafkaConsumerBase<String> consumer = new
>> FlinkKafkaConsumer<>(
>> "team",
>> new SimpleStringSchema(),
>> properties
>> ).setStartFromEarliest();
>>
>> DataStream<String> ds = streamEnv.addSource(consumer);
>>
>> String[] fieldNames = {"team_id", "team_name", "create_time",
>> "update_time", "op"};
>> TypeInformation[] types = {Types.INT, Types.STRING, Types.STRING,
>> Types.STRING, Types.STRING};
>> DataStream<Row> ds2 = ds.map(str -> {
>> JSONObject jsonObject = JSON.parseObject(str);
>> String op = jsonObject.getString("op");
>> JSONObject data = jsonObject.getJSONObject("data");
>> int arity = fieldNames.length;
>> Row row = new Row(arity);
>> row.setField(0, data.get("team_id"));
>> row.setField(1, data.get("team_name"));
>> row.setField(2, data.get("create_time"));
>> row.setField(3, data.get("update_time"));
>> String operation = getOperation(op);
>> row.setField(4, operation);
>>
>> return row;
>> }, new RowTypeInfo(types, fieldNames));
>>
>> tableEnv.registerDataStream("merged_team", ds2);
>>
>> tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>>
>> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS ods");
>> tableEnv.executeSql("DROP TABLE IF EXISTS ods.team");
>>
>> tableEnv.executeSql("CREATE TABLE ods.team (\n" +
>> " team_id INT,\n" +
>> " team_name STRING,\n" +
>> " create_time STRING,\n" +
>> " update_time STRING,\n" +
>> " op STRING\n" +
>> // ") PARTITIONED BY (\n" +
>> // " ts_date STRING,\n" +
>> // " ts_hour STRING,\n" +
>> // " ts_minute STRING\n" +
>> ") STORED AS PARQUET TBLPROPERTIES (\n" +
>> " 'sink.partition-commit.trigger' = 'partition-time',\n"
>> +
>> " 'sink.partition-commit.delay' = '1 min',\n" +
>> " 'sink.partition-commit.policy.kind' =
>> 'metastore,success-file',\n" +
>> " 'partition.time-extractor.timestamp-pattern' =
>> '$ts_date $ts_hour:$ts_minute:00'\n" +
>> ")");
>>
>> tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>> tableEnv.executeSql("INSERT INTO ods.team \n" +
>> "SELECT team_id, team_name, create_time, update_time, op
>> \n" +
>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
>> HH:mm:ss'), 'yyyyMMdd') as ts_date, \n" +
>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
>> HH:mm:ss'), 'HH') as ts_hour, \n" +
>> // " DATE_FORMAT(TO_TIMESTAMP(create_time, 'yyyy-MM-dd
>> HH:mm:ss'), 'mm') as ts_minute \n" +
>> "FROM merged_team");
>> tableEnv.execute("MysqlCDC2Hive2");
>>
>> streamEnv.execute("");
>> }
>>
>> private static String getOperation(String op) {
>> String operation = "INSERT";
>> for (RowKind rk : RowKind.values()) {
>> if (rk.shortString().equals(op)) {
>> switch (rk) {
>> case UPDATE_BEFORE:
>> case UPDATE_AFTER:
>> operation = "UPDATE";
>> break;
>> case DELETE:
>> operation = "DELETE";
>> break;
>> case INSERT:
>> default:
>> operation = "INSERT";
>> break;
>> }
>> break;
>> }
>> }
>> return operation;
>> }
>> }
>>
>> Jark Wu <[email protected]> 于2020年10月31日周六 下午1:45写道:
>>
>>> 1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive
>>> ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。
>>>
>>> 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming
>>> 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive
>>> 中进行合并。merge过程可以参考这篇文章[1]。
>>>
>>> 3. 你可以 ts + INTERVAL '8' HOUR
>>>
>>> PS: 在1.12中,我们计划让 hive 也能直接写 changelog 数据,这样的话 cdc 可以直接 -> hive
>>> streaming,不需要中间的 kafka。 不过到了 hive 中后,仍然需要另外写 query 将数据做实时merge。
>>>
>>> Best,
>>> Jark
>>>
>>> On Sat, 31 Oct 2020 at 13:26, 罗显宴 <[email protected]> wrote:
>>>
>>>> hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。
>>>>
>>>>
>>>> | |
>>>> 罗显宴
>>>> |
>>>> |
>>>> 邮箱:[email protected]
>>>> |
>>>>
>>>> 签名由 网易邮箱大师 定制
>>>>
>>>> 在2020年10月31日 12:06,陈帅 写道:
>>>> 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛
>>>>
>>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>>> AppendStreamTableSink doesn't support consuming update and delete
>>>> changes
>>>> which is produced by node TableSourceScan(table=[[hive_catalog, cdc,
>>>> team]], fields=[team_id, team_name, create_time, update_time])
>>>>
>>>> 我的问题:
>>>> 1. 是不是因为hive2不支持delete/update,如果换hive 3能否支持呢?
>>>> 2. 如果要支持这种场景是不是中间需要加一层kafka介质(通过 changelog-json 格式),即cdc ->
>>>> kafka,然后kafka
>>>> -> hive streaming? 谢谢!
>>>> 3. DATE_FORMAT函数出来的时间是UTC的,怎么转成GMT+8,只能通过UDF么?
>>>>
>>>> sql语句如下
>>>>
>>>> CREATE DATABASE IF NOT EXISTS cdc
>>>>
>>>> DROP TABLE IF EXISTS cdc.team
>>>>
>>>> CREATE TABLE team(
>>>> team_id BIGINT,
>>>> team_name STRING,
>>>> create_time TIMESTAMP,
>>>> update_time TIMESTAMP,
>>>> proctime as proctime()
>>>> ) WITH (
>>>> 'connector' = 'mysql-cdc',
>>>> 'hostname' = 'localhost',
>>>> 'port' = '3306',
>>>> 'username' = 'root',
>>>> 'password' = 'root',
>>>> 'database-name' = 'test',
>>>> 'table-name' = 'team'
>>>> )
>>>>
>>>> CREATE DATABASE IF NOT EXISTS ods
>>>>
>>>> DROP TABLE IF EXISTS ods.team
>>>>
>>>> CREATE TABLE ods.team (
>>>> team_id BIGINT,
>>>> team_name STRING,
>>>> create_time TIMESTAMP,
>>>> update_time TIMESTAMP,
>>>> ) PARTITIONED BY (
>>>> ts_date STRING,
>>>> ts_hour STRING,
>>>> ts_minute STRING,
>>>> ) STORED AS PARQUET TBLPROPERTIES (
>>>> 'sink.partition-commit.trigger' = 'partition-time',
>>>> 'sink.partition-commit.delay' = '1 min',
>>>> 'sink.partition-commit.policy.kind' = 'metastore,success-file',
>>>> 'partition.time-extractor.timestamp-pattern' = '$ts_date
>>>> $ts_hour:$ts_minute:00'
>>>> )
>>>>
>>>> INSERT INTO ods.team
>>>> SELECT team_id, team_name, create_time, update_time,
>>>> my_date_format(create_time,'yyyy-MM-dd', 'Asia/Shanghai'),
>>>> my_date_format(create_time,'HH', 'Asia/Shanghai'),
>>>> my_date_format(create_time,'mm', 'Asia/Shanghai')
>>>> FROM cdc.team
>>>>
>>>