你好 测试代码如下
private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + " `data` VARCHAR , " + " `table` VARCHAR " + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'source_databases'," + " 'properties.bootstrap.servers' = '***'," + " 'properties.group.id' = 'real1'," + " 'format' = 'json'," + " 'scan.startup.mode' = 'earliest-offset'" + ")"; public static void main(String[] args) throws Exception { //bink table StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); tableResult.print(); Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); bsEnv.execute("aa"); } 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog ,order_operation_time ,inventory_batch_log ,order_log ,order_address_book ,product_inventory ,order_physical_relation ,bil_business_attach ,picking_detail ,picking_detail ,orders 另外再问个问题。1.11版本 blink 不能datastream转table吗? 看到例子都是useOldPlanner 来转table的。 致谢 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道: >抱歉,还是没有看到附件。 >如果是文本的话,你可以直接贴到邮件里。 > >On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote: > >> 我再上传一次 >> >> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道: >> >> Hi, >> 你的附件好像没有上传。 >> >> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote: >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* >> > >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >> > " `data` VARCHAR , " + >> > " `table` VARCHAR " + >> > ") WITH (" + >> > " 'connector' = 'kafka'," + >> > " 'topic' = 'order_source'," + >> > " 'properties.bootstrap.servers' = '***'," + >> > " 'properties.group.id' = 'real1'," + >> > " 'format' = 'json'," + >> > " 'scan.startup.mode' = 'earliest-offset'" + >> > ")"; >> > >> > >> > 具体见附件 有打印 >> > >> > >> > >> > >> > >> >>