你的flink什么版本 发送自 Windows 10 版邮件应用
发件人: air23 发送时间: 2020年7月28日 15:36 收件人: user-zh@flink.apache.org 主题: Re:Re: Re: 解析kafka的mysql binlog问题 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 { "data":[ { "op_id":"97037138", "order_id":"84172164" } ], "database":"order_11", "es":1595720375000, "id":17469027, "isDdl":false, "mysqlType":{ "op_id":"int(11)", "order_id":"int(11)" }, "old":null, "pkNames":[ "op_id" ], "sql":"", "sqlType":{ "op_id":4, "order_id":4 }, "table":"order_product", "ts":1595720375837, "type":"INSERT" } 在 2020-07-28 14:44:35,"Jark Wu" <imj...@gmail.com> 写道: >有kafka 中json 数据的样例不? >有没有看过 TaskManager 中有没有异常 log 信息? > > > >On Tue, 28 Jul 2020 at 09:40, air23 <wangfei23_...@163.com> wrote: > >> 你好 测试代码如下 >> >> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >> " `data` VARCHAR , " + >> " `table` VARCHAR " + >> ") WITH (" + >> " 'connector' = 'kafka'," + >> " 'topic' = 'source_databases'," + >> " 'properties.bootstrap.servers' = '***'," + >> " 'properties.group.id' = 'real1'," + >> " 'format' = 'json'," + >> " 'scan.startup.mode' = 'earliest-offset'" + >> ")"; >> public static void main(String[] args) throws Exception { >> >> >> //bink table >> StreamExecutionEnvironment bsEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> EnvironmentSettings bsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> StreamTableEnvironment bsTableEnv = >> StreamTableEnvironment.create(bsEnv, bsSettings); >> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >> >> >> tableResult.print(); >> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >> >> bsEnv.execute("aa"); >> >> } >> >> >> >> >> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >> ,order_operation_time >> ,inventory_batch_log >> ,order_log >> ,order_address_book >> ,product_inventory >> ,order_physical_relation >> ,bil_business_attach >> ,picking_detail >> ,picking_detail >> ,orders >> >> >> >> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >> 看到例子都是useOldPlanner 来转table的。 >> 致谢 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道: >> >抱歉,还是没有看到附件。 >> >如果是文本的话,你可以直接贴到邮件里。 >> > >> >On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote: >> > >> >> 我再上传一次 >> >> >> >> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道: >> >> >> >> Hi, >> >> 你的附件好像没有上传。 >> >> >> >> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote: >> >> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* >> >> > >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" >> + >> >> > " `data` VARCHAR , " + >> >> > " `table` VARCHAR " + >> >> > ") WITH (" + >> >> > " 'connector' = 'kafka'," + >> >> > " 'topic' = 'order_source'," + >> >> > " 'properties.bootstrap.servers' = '***'," + >> >> > " 'properties.group.id' = 'real1'," + >> >> > " 'format' = 'json'," + >> >> > " 'scan.startup.mode' = 'earliest-offset'" + >> >> > ")"; >> >> > >> >> > >> >> > 具体见附件 有打印 >> >> > >> >> > >> >> > >> >> > >> >> > >> >> >> >> >>