各位大佬

我看flink-connector-kudu的例子都是DataStream,但是我想用DataSet 进行点查。 看着提示好像不支持。 有什么办法处理么?

代码如下:


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
StreamTableEnvironment tEnv2 = StreamTableEnvironment.create(env);

ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);


String KUDU_MASTERS="192.168.248.4:7051";
KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS);
tEnv2.registerCatalog("kudu", catalog);
tEnv2.useCatalog("kudu");

oldBatchTableEnv.registerCatalog("kudu", catalog);
oldBatchTableEnv.useCatalog("kudu");

Table odlTable = oldBatchTableEnv.sqlQuery("select * from users");

DataSet<Row> dsRow = oldBatchTableEnv.toDataSet(odlTable, Row.class);
dsRow.print();


Table table = tEnv2.sqlQuery("select * from users");

tEnv2.toAppendStream(table, Row.class).print();

报错如下:
ERROR StatusLogger No Log4j 2 configuration file found. Using default 
configuration (logging only errors to the console), or user programmatically 
provided configurations. Set system property 'log4j2.debug' to show Log4j 2 
internal initialization logging. See 
https://logging.apache.org/log4j/2.x/manual/configuration.html for instructions 
on how to configure Log4j 2
Exception in thread "main" org.apache.flink.table.api.TableException: Only 
BatchTableSource and InputFormatTableSource are supported in 
BatchTableEnvironment.
at 
org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116)
at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580)
at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555)
at 
org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537)
at 
org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101)
at com.hujiang.bi.order.OrderMasterJob.main(OrderMasterJob.java:44)


看着应该是 flink-connector-kudu不支持batch读了。。。

回复