Hi

從代碼上來看是使用了 regular join 關聯了 kafka source 和 hbase source,hbase connector
目前是不支持流式數據源的
你可以從任務儀表板上確認下提交的任務,hbase source 的部分應該在執行一段時間後狀態會變更為 FINISHED,目前 flink
checkpoint 還不支持在 FINISHED task 上執行

你可以考慮改寫 sql 使用 processing time temporal join [1] 的方式來關聯 hbase table,從 kafka
消費的數據會實時的去查 hbase table 的當前數據做關聯。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#processing-time-temporal-join

chang li <[email protected]> 於 2021年9月10日 週五 下午7:40寫道:

> 没有开启Checkpoint
> execEnv.enableCheckpointing(checkpointInterval);
>
> On 2021/09/10 07:41:10, "[email protected]" <[email protected]> wrote:
> > Hi:
> >     有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume
> 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请问一下是需要做其他什么配置吗?代码如下
> >
> > DataStream<String> kafkaSource = env.addSource(source);
> > Map<String, OutputTag<TmpTable>> sideOutStreamMap = new HashMap<>();
> > for (RowToColumnBean bean : lists) {
> >     OutputTag<TmpTable> app = new
> OutputTag<TmpTable>(bean.getMainTable()) {
> >     };
> >     sideOutStreamMap.put(bean.getMainTable(), app);
> > }
> >
> > RowToNumberProcessFunction rowToNumberProcessFunction = new
> RowToNumberProcessFunction(sideOutStreamMap, lists);
> > SingleOutputStreamOperator<TmpTable> process =
> kafkaSource.process(rowToNumberProcessFunction);
> >
> > EnvironmentSettings settings = EnvironmentSettings.newInstance()
> >         .useBlinkPlanner()
> >         .inStreamingMode()
> >         .build();
> >
> > StreamTableEnvironment tableEnv = StreamTableEnvironmentImpl.create(env,
> settings, new TableConfig());
> > //设置checkpoint
> >
> tableEnv.getConfig().getConfiguration().setString("execution.checkpointing.interval",
> "10 s");
> >
> > for (RowToColumnBean bean : lists) {
> >     DataStream<TmpTable> dataStream =
> process.getSideOutput(sideOutStreamMap.get(bean.getMainTable()));
> >
> >     String mainTable = bean.getMainTable().split("
> ")[0].split("\\.")[1].toLowerCase();
> >
> >     //Table tmpTable = tableEnv.fromDataStream(dataStream,
> StrUtil.list2Str(bean.getQueryColumns()));
> >
> >     tableEnv.createTemporaryView(mainTable, dataStream);
> >
> >     String joinTable = mainTable + "_join";
> >     tableEnv.executeSql("CREATE TABLE " + joinTable + "(\n" +
> >             "rowkey STRING,\n" +
> >             "info ROW<formid STRING>,\n" +
> >             "PRIMARY KEY (rowkey) NOT ENFORCED\n" +
> >             ") WITH (\n" +
> >             "'connector' = 'hbase-2.2',\n" +
> >             "'table-name' =
> 'hid0101_cache_his_dhcapp_nemrforms:dformfiled',\n" +
> >             "'zookeeper.quorum' = '192.168.0.115:2181',\n" +
> >             "'zookeeper.znode.parent' = '/hbase'\n" +
> >             ")");
> >
> >
> >     //查询数据
> >     //Table table = tableEnv.sqlQuery("select b.* from tmp a left join
> dformfiled b on a.key = b.rowkey");
> >     Table table = tableEnv.sqlQuery("select a.*,b.* from " + mainTable +
> " a left join " + joinTable + " b on a.key = lower(b.rowkey) and
> b.formid='550' where b.rowkey is not null");
> >
> >     TableSchema schema = table.getSchema();
> >     schema.getTableColumns().forEach(column -> {
> >
> >         System.err.println(column.asSummaryString());
> >     });
> >
> >     DataStream<Tuple2<Boolean, Row>> tuple2DataStream =
> tableEnv.toRetractStream(table, Row.class);
> >     tuple2DataStream.print(mainTable);
> >     dataStream.print(mainTable);
> > }
> >
> >
> > [email protected]
> >
>

回复