各位大佬 我看flink-connector-kudu的例子都是DataStream,但是我想用DataSet 进行点查。 看着提示好像不支持。 有什么办法处理么?
代码如下: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); StreamTableEnvironment tEnv2 = StreamTableEnvironment.create(env); ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv); String KUDU_MASTERS="192.168.248.4:7051"; KuduCatalog catalog = new KuduCatalog(KUDU_MASTERS); tEnv2.registerCatalog("kudu", catalog); tEnv2.useCatalog("kudu"); oldBatchTableEnv.registerCatalog("kudu", catalog); oldBatchTableEnv.useCatalog("kudu"); Table odlTable = oldBatchTableEnv.sqlQuery("select * from users"); DataSet<Row> dsRow = oldBatchTableEnv.toDataSet(odlTable, Row.class); dsRow.print(); Table table = tEnv2.sqlQuery("select * from users"); tEnv2.toAppendStream(table, Row.class).print(); 报错如下: ERROR StatusLogger No Log4j 2 configuration file found. Using default configuration (logging only errors to the console), or user programmatically provided configurations. Set system property 'log4j2.debug' to show Log4j 2 internal initialization logging. See https://logging.apache.org/log4j/2.x/manual/configuration.html for instructions on how to configure Log4j 2 Exception in thread "main" org.apache.flink.table.api.TableException: Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment. at org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:116) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:580) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:555) at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:537) at org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:101) at com.hujiang.bi.order.OrderMasterJob.main(OrderMasterJob.java:44) 看着应该是 flink-connector-kudu不支持batch读了。。。