你好 测试代码如下

private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'source_databases'," +
" 'properties.bootstrap.servers' = '***'," +
" 'properties.group.id' = 'real1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +
")";
public static void main(String[] args) throws Exception {


//bink table
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);

    TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);


tableResult.print();

    Table table = bsTableEnv.sqlQuery("select * from kafkaTable");

bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);

bsEnv.execute("aa");

}




输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
,order_operation_time
,inventory_batch_log
,order_log
,order_address_book
,product_inventory
,order_physical_relation
,bil_business_attach
,picking_detail
,picking_detail
,orders




另外再问个问题。1.11版本 blink 不能datastream转table吗?
看到例子都是useOldPlanner 来转table的。
致谢














在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道:
>抱歉,还是没有看到附件。
>如果是文本的话,你可以直接贴到邮件里。
>
>On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote:
>
>> 我再上传一次
>>
>> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道:
>>
>> Hi,
>> 你的附件好像没有上传。
>>
>> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote:
>>
>> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >
>> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> >         " `data` VARCHAR , " +
>> >         " `table` VARCHAR " +
>> >         ") WITH (" +
>> >         " 'connector' = 'kafka'," +
>> >         " 'topic' = 'order_source'," +
>> >         " 'properties.bootstrap.servers' = '***'," +
>> >         " 'properties.group.id' = 'real1'," +
>> >         " 'format' = 'json'," +
>> >         " 'scan.startup.mode' = 'earliest-offset'" +
>> >         ")";
>> >
>> >
>> > 具体见附件 有打印
>> >
>> >
>> >
>> >
>> >
>>
>>

回复