Hi:
有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume
消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下
DataStream<String> kafkaSource = env.addSource(source);
Map<String, OutputTag<TmpTable>> sideOutStreamMap = new HashMap<>();
for (RowToColumnBean bean : lists) {
OutputTag<TmpTable> app = new OutputTag<TmpTable>(bean.getMainTable()) {
};
sideOutStreamMap.put(bean.getMainTable(), app);
}
RowToNumberProcessFunction rowToNumberProcessFunction = new
RowToNumberProcessFunction(sideOutStreamMap, lists);
SingleOutputStreamOperator<TmpTable> process =
kafkaSource.process(rowToNumberProcessFunction);
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env,
settings, new TableConfig());
//设置checkpoint
tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
"10 s");
for (RowToColumnBean bean : lists) {
DataStream<TmpTable> dataStream =
process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));
String mainTable = bean.getMainTable().split("
")[0].split("\\.")[1].toLowerCase();
//Table tmpTable = tableEnv.fromDataStream(dataStream,
StrUtil.list2Str(bean.getQueryColumns()));
tableEnv.createTemporaryView(mainTable, dataStream);
String joinTable = mainTable + "_join";
tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
"rowkey STRING,\n" +
"info ROW<formid STRING>,\n" +
"PRIMARY KEY (rowkey) NOT ENFORCED\n" +
") WITH (\n" +
"'connector' = 'hbase-2.2',\n" +
"'table-name' = 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n"
+
"'zookeeper.quorum' = '192.168.0.115:2181',\n" +
"'zookeeper.znode.parent' = '/hbase'\n" +
")");
//查询数据
//Table table = tableEnv.sqlQuery("select b.* from tmp a left join
dformfiled b on a.key = b.rowkey");
Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable + " a
left join " + joinTable + " b on a.key = lower(b.rowkey) and b.formid='550'
where b.rowkey is not null");
TableSchema schema = table.getSchema();
schema.getTableColumns().forEach(column -> {
System.err.println(column.asSummaryString());
});
DataStream<Tuple2<Boolean, Row>> tuple2DataStream =
tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print(mainTable);
dataStream.print(mainTable);
}
[email protected]