Re: 解析kafka的mysql binlog问题

2020-07-28 文章 admin
直接转成string1.11版本还不支持,会在1.12修复,参考jira[1]

[1]https://issues.apache.org/jira/browse/FLINK-18002 


> 2020年7月28日 下午5:20,air23  写道:
> 
> 你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致
> 另外想请教下 1.11 版本  datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的,
> 但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-28 16:02:18,"Jark Wu"  写道:
>> 因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
>> 1.12 中已经支持读取复杂结构为 string 类型了。
>> 
>> Best,
>> Jark
>> 
>> On Tue, 28 Jul 2020 at 15:36, air23  wrote:
>> 
>>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>> 
>>> 
>>> {
>>>"data":[
>>>{
>>>"op_id":"97037138",
>>>"order_id":"84172164"
>>>}
>>>],
>>>"database":"order_11",
>>>"es":1595720375000,
>>>"id":17469027,
>>>"isDdl":false,
>>>"mysqlType":{
>>>"op_id":"int(11)",
>>>"order_id":"int(11)"
>>>},
>>>"old":null,
>>>"pkNames":[
>>>"op_id"
>>>],
>>>"sql":"",
>>>"sqlType":{
>>>"op_id":4,
>>>"order_id":4
>>>},
>>>"table":"order_product",
>>>"ts":1595720375837,
>>>"type":"INSERT"
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
 有kafka 中json 数据的样例不?
 有没有看过 TaskManager 中有没有异常 log 信息?
 
 
 
 On Tue, 28 Jul 2020 at 09:40, air23  wrote:
 
> 你好 测试代码如下
> 
> 
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'source_databases'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
> public static void main(String[] args) throws Exception {
> 
> 
> //bink table
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings bsSettings =
> 
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
> 
>TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
> 
> 
> tableResult.print();
> 
>Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
> 
> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
> 
> bsEnv.execute("aa");
> 
> }
> 
> 
> 
> 
> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
> ,order_operation_time
> ,inventory_batch_log
> ,order_log
> ,order_address_book
> ,product_inventory
> ,order_physical_relation
> ,bil_business_attach
> ,picking_detail
> ,picking_detail
> ,orders
> 
> 
> 
> 
> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> 看到例子都是useOldPlanner 来转table的。
> 致谢
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>> 抱歉,还是没有看到附件。
>> 如果是文本的话,你可以直接贴到邮件里。
>> 
>> On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>> 
>>> 我再上传一次
>>> 
>>> 在2020年07月27日 18:55,Jark Wu  写道:
>>> 
>>> Hi,
>>> 你的附件好像没有上传。
>>> 
>>> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>>> 
 *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
>>> 不能取到data呢?*
 
 private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
>>> (\n"
> +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order_source'," +
" 'properties.bootstrap.servers' = '***'," +
" 'properties.group.id' = 'real1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +
")";
 
 
 具体见附件 有打印
 
 
 
 
 
>>> 
>>> 
> 
>>> 



Re:Re: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致
另外想请教下 1.11 版本  datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的,
但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。

















在 2020-07-28 16:02:18,"Jark Wu"  写道:
>因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
>1.12 中已经支持读取复杂结构为 string 类型了。
>
>Best,
>Jark
>
>On Tue, 28 Jul 2020 at 15:36, air23  wrote:
>
>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>
>>
>> {
>> "data":[
>> {
>> "op_id":"97037138",
>> "order_id":"84172164"
>> }
>> ],
>> "database":"order_11",
>> "es":1595720375000,
>> "id":17469027,
>> "isDdl":false,
>> "mysqlType":{
>> "op_id":"int(11)",
>> "order_id":"int(11)"
>> },
>> "old":null,
>> "pkNames":[
>> "op_id"
>> ],
>> "sql":"",
>> "sqlType":{
>> "op_id":4,
>> "order_id":4
>> },
>> "table":"order_product",
>> "ts":1595720375837,
>> "type":"INSERT"
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
>> >有kafka 中json 数据的样例不?
>> >有没有看过 TaskManager 中有没有异常 log 信息?
>> >
>> >
>> >
>> >On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>> >
>> >> 你好 测试代码如下
>> >>
>> >>
>> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> >> " `data` VARCHAR , " +
>> >> " `table` VARCHAR " +
>> >> ") WITH (" +
>> >> " 'connector' = 'kafka'," +
>> >> " 'topic' = 'source_databases'," +
>> >> " 'properties.bootstrap.servers' = '***'," +
>> >> " 'properties.group.id' = 'real1'," +
>> >> " 'format' = 'json'," +
>> >> " 'scan.startup.mode' = 'earliest-offset'" +
>> >> ")";
>> >> public static void main(String[] args) throws Exception {
>> >>
>> >>
>> >> //bink table
>> >> StreamExecutionEnvironment bsEnv =
>> >> StreamExecutionEnvironment.getExecutionEnvironment();
>> >> EnvironmentSettings bsSettings =
>> >>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> >> StreamTableEnvironment bsTableEnv =
>> >> StreamTableEnvironment.create(bsEnv, bsSettings);
>> >>
>> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>> >>
>> >>
>> >> tableResult.print();
>> >>
>> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>> >>
>> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>> >>
>> >> bsEnv.execute("aa");
>> >>
>> >> }
>> >>
>> >>
>> >>
>> >>
>> >> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>> >> ,order_operation_time
>> >> ,inventory_batch_log
>> >> ,order_log
>> >> ,order_address_book
>> >> ,product_inventory
>> >> ,order_physical_relation
>> >> ,bil_business_attach
>> >> ,picking_detail
>> >> ,picking_detail
>> >> ,orders
>> >>
>> >>
>> >>
>> >>
>> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> >> 看到例子都是useOldPlanner 来转table的。
>> >> 致谢
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>> >> >抱歉,还是没有看到附件。
>> >> >如果是文本的话,你可以直接贴到邮件里。
>> >> >
>> >> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>> >> >
>> >> >> 我再上传一次
>> >> >>
>> >> >> 在2020年07月27日 18:55,Jark Wu  写道:
>> >> >>
>> >> >> Hi,
>> >> >> 你的附件好像没有上传。
>> >> >>
>> >> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>> >> >>
>> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
>> 不能取到data呢?*
>> >> >> >
>> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
>> (\n"
>> >> +
>> >> >> > " `data` VARCHAR , " +
>> >> >> > " `table` VARCHAR " +
>> >> >> > ") WITH (" +
>> >> >> > " 'connector' = 'kafka'," +
>> >> >> > " 'topic' = 'order_source'," +
>> >> >> > " 'properties.bootstrap.servers' = '***'," +
>> >> >> > " 'properties.group.id' = 'real1'," +
>> >> >> > " 'format' = 'json'," +
>> >> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>> >> >> > ")";
>> >> >> >
>> >> >> >
>> >> >> > 具体见附件 有打印
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >>
>> >> >>
>> >>
>>


Re:回复: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
你好 使用的是1.11.1版本的 



















在 2020-07-28 16:02:30,"明启 孙" <374060...@qq.com> 写道:
>你的flink什么版本
>
>发送自 Windows 10 版邮件应用
>
>发件人: air23
>发送时间: 2020年7月28日 15:36
>收件人: user-zh@flink.apache.org
>主题: Re:Re: Re: 解析kafka的mysql binlog问题
>
>格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
>{
>"data":[
>{
>"op_id":"97037138",
>"order_id":"84172164"
>}
>],
>"database":"order_11",
>"es":1595720375000,
>"id":17469027,
>"isDdl":false,
>"mysqlType":{
>"op_id":"int(11)",
>"order_id":"int(11)"
>},
>"old":null,
>"pkNames":[
>"op_id"
>],
>"sql":"",
>"sqlType":{
>"op_id":4,
>"order_id":4
>},
>"table":"order_product",
>"ts":1595720375837,
>"type":"INSERT"
>}
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-07-28 14:44:35,"Jark Wu"  写道:
>>有kafka 中json 数据的样例不?
>>有没有看过 TaskManager 中有没有异常 log 信息?
>>
>>
>>
>>On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>>
>>> 你好 测试代码如下
>>>
>>>
>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>> " `data` VARCHAR , " +
>>> " `table` VARCHAR " +
>>> ") WITH (" +
>>> " 'connector' = 'kafka'," +
>>> " 'topic' = 'source_databases'," +
>>> " 'properties.bootstrap.servers' = '***'," +
>>> " 'properties.group.id' = 'real1'," +
>>> " 'format' = 'json'," +
>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>> ")";
>>> public static void main(String[] args) throws Exception {
>>>
>>>
>>> //bink table
>>> StreamExecutionEnvironment bsEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>> StreamTableEnvironment bsTableEnv =
>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>
>>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>
>>>
>>> tableResult.print();
>>>
>>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>
>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>
>>> bsEnv.execute("aa");
>>>
>>> }
>>>
>>>
>>>
>>>
>>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>>> ,order_operation_time
>>> ,inventory_batch_log
>>> ,order_log
>>> ,order_address_book
>>> ,product_inventory
>>> ,order_physical_relation
>>> ,bil_business_attach
>>> ,picking_detail
>>> ,picking_detail
>>> ,orders
>>>
>>>
>>>
>>>
>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>> 看到例子都是useOldPlanner 来转table的。
>>> 致谢
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>>> >抱歉,还是没有看到附件。
>>> >如果是文本的话,你可以直接贴到邮件里。
>>> >
>>> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>>> >
>>> >> 我再上传一次
>>> >>
>>> >> 在2020年07月27日 18:55,Jark Wu  写道:
>>> >>
>>> >> Hi,
>>> >> 你的附件好像没有上传。
>>> >>
>>> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>>> >>
>>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>>> >> >
>>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>> +
>>> >> > " `data` VARCHAR , " +
>>> >> > " `table` VARCHAR " +
>>> >> > ") WITH (" +
>>> >> > " 'connector' = 'kafka'," +
>>> >> > " 'topic' = 'order_source'," +
>>> >> > " 'properties.bootstrap.servers' = '***'," +
>>> >> > " 'properties.group.id' = 'real1'," +
>>> >> > " 'format' = 'json'," +
>>> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>>> >> > ")";
>>> >> >
>>> >> >
>>> >> > 具体见附件 有打印
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >>
>>> >>
>>>
>


Re:Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
你好。
我猜测 是有可能是这个问题。但是我这个topic是 读取的一个库的binlog。有很多表   所以ARRAY>
这种 里面 不是固定的
 所以我想用datastream 解析 然后在根据表不同 解析成不同的table。但是发现blinkplaner 好像不可以datastream 
转换为table。或者是我没有发现这个例子
谢谢

















在 2020-07-28 16:05:55,"admin" <17626017...@163.com> 写道:
>data格式不是string,可以定义为ARRAY>
>
>> 2020年7月28日 下午3:35,air23  写道:
>> 
>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>> 
>> 
>> {
>>"data":[
>>{
>>"op_id":"97037138",
>>"order_id":"84172164"
>>}
>>],
>>"database":"order_11",
>>"es":1595720375000,
>>"id":17469027,
>>"isDdl":false,
>>"mysqlType":{
>>"op_id":"int(11)",
>>"order_id":"int(11)"
>>},
>>"old":null,
>>"pkNames":[
>>"op_id"
>>],
>>"sql":"",
>>"sqlType":{
>>"op_id":4,
>>"order_id":4
>>},
>>"table":"order_product",
>>"ts":1595720375837,
>>"type":"INSERT"
>> }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
>>> 有kafka 中json 数据的样例不?
>>> 有没有看过 TaskManager 中有没有异常 log 信息?
>>> 
>>> 
>>> 
>>> On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>>> 
 你好 测试代码如下
 
 
 private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
 " `data` VARCHAR , " +
 " `table` VARCHAR " +
 ") WITH (" +
 " 'connector' = 'kafka'," +
 " 'topic' = 'source_databases'," +
 " 'properties.bootstrap.servers' = '***'," +
 " 'properties.group.id' = 'real1'," +
 " 'format' = 'json'," +
 " 'scan.startup.mode' = 'earliest-offset'" +
 ")";
 public static void main(String[] args) throws Exception {
 
 
 //bink table
 StreamExecutionEnvironment bsEnv =
 StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
 EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
 StreamTableEnvironment.create(bsEnv, bsSettings);
 
TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
 
 
 tableResult.print();
 
Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
 
 bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
 
 bsEnv.execute("aa");
 
 }
 
 
 
 
 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
 ,order_operation_time
 ,inventory_batch_log
 ,order_log
 ,order_address_book
 ,product_inventory
 ,order_physical_relation
 ,bil_business_attach
 ,picking_detail
 ,picking_detail
 ,orders
 
 
 
 
 另外再问个问题。1.11版本 blink 不能datastream转table吗?
 看到例子都是useOldPlanner 来转table的。
 致谢
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 在 2020-07-27 19:44:10,"Jark Wu"  写道:
> 抱歉,还是没有看到附件。
> 如果是文本的话,你可以直接贴到邮件里。
> 
> On Mon, 27 Jul 2020 at 19:22, air23  wrote:
> 
>> 我再上传一次
>> 
>> 在2020年07月27日 18:55,Jark Wu  写道:
>> 
>> Hi,
>> 你的附件好像没有上传。
>> 
>> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>> 
>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>>> 
>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
 +
>>>" `data` VARCHAR , " +
>>>" `table` VARCHAR " +
>>>") WITH (" +
>>>" 'connector' = 'kafka'," +
>>>" 'topic' = 'order_source'," +
>>>" 'properties.bootstrap.servers' = '***'," +
>>>" 'properties.group.id' = 'real1'," +
>>>" 'format' = 'json'," +
>>>" 'scan.startup.mode' = 'earliest-offset'" +
>>>")";
>>> 
>>> 
>>> 具体见附件 有打印
>>> 
>>> 
>>> 
>>> 
>>> 
>> 
>> 
 


Re: 解析kafka的mysql binlog问题

2020-07-28 文章 admin
data格式不是string,可以定义为ARRAY>

> 2020年7月28日 下午3:35,air23  写道:
> 
> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
> 
> 
> {
>"data":[
>{
>"op_id":"97037138",
>"order_id":"84172164"
>}
>],
>"database":"order_11",
>"es":1595720375000,
>"id":17469027,
>"isDdl":false,
>"mysqlType":{
>"op_id":"int(11)",
>"order_id":"int(11)"
>},
>"old":null,
>"pkNames":[
>"op_id"
>],
>"sql":"",
>"sqlType":{
>"op_id":4,
>"order_id":4
>},
>"table":"order_product",
>"ts":1595720375837,
>"type":"INSERT"
> }
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
>> 有kafka 中json 数据的样例不?
>> 有没有看过 TaskManager 中有没有异常 log 信息?
>> 
>> 
>> 
>> On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>> 
>>> 你好 测试代码如下
>>> 
>>> 
>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>> " `data` VARCHAR , " +
>>> " `table` VARCHAR " +
>>> ") WITH (" +
>>> " 'connector' = 'kafka'," +
>>> " 'topic' = 'source_databases'," +
>>> " 'properties.bootstrap.servers' = '***'," +
>>> " 'properties.group.id' = 'real1'," +
>>> " 'format' = 'json'," +
>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>> ")";
>>> public static void main(String[] args) throws Exception {
>>> 
>>> 
>>> //bink table
>>> StreamExecutionEnvironment bsEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>StreamTableEnvironment bsTableEnv =
>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>> 
>>>TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>> 
>>> 
>>> tableResult.print();
>>> 
>>>Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>> 
>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>> 
>>> bsEnv.execute("aa");
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>>> ,order_operation_time
>>> ,inventory_batch_log
>>> ,order_log
>>> ,order_address_book
>>> ,product_inventory
>>> ,order_physical_relation
>>> ,bil_business_attach
>>> ,picking_detail
>>> ,picking_detail
>>> ,orders
>>> 
>>> 
>>> 
>>> 
>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>> 看到例子都是useOldPlanner 来转table的。
>>> 致谢
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
 抱歉,还是没有看到附件。
 如果是文本的话,你可以直接贴到邮件里。
 
 On Mon, 27 Jul 2020 at 19:22, air23  wrote:
 
> 我再上传一次
> 
> 在2020年07月27日 18:55,Jark Wu  写道:
> 
> Hi,
> 你的附件好像没有上传。
> 
> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
> 
>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> 
>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>> +
>>" `data` VARCHAR , " +
>>" `table` VARCHAR " +
>>") WITH (" +
>>" 'connector' = 'kafka'," +
>>" 'topic' = 'order_source'," +
>>" 'properties.bootstrap.servers' = '***'," +
>>" 'properties.group.id' = 'real1'," +
>>" 'format' = 'json'," +
>>" 'scan.startup.mode' = 'earliest-offset'" +
>>")";
>> 
>> 
>> 具体见附件 有打印
>> 
>> 
>> 
>> 
>> 
> 
> 
>>> 



Re: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 Jark Wu
因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
1.12 中已经支持读取复杂结构为 string 类型了。

Best,
Jark

On Tue, 28 Jul 2020 at 15:36, air23  wrote:

> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
> {
> "data":[
> {
> "op_id":"97037138",
> "order_id":"84172164"
> }
> ],
> "database":"order_11",
> "es":1595720375000,
> "id":17469027,
> "isDdl":false,
> "mysqlType":{
> "op_id":"int(11)",
> "order_id":"int(11)"
> },
> "old":null,
> "pkNames":[
> "op_id"
> ],
> "sql":"",
> "sqlType":{
> "op_id":4,
> "order_id":4
> },
> "table":"order_product",
> "ts":1595720375837,
> "type":"INSERT"
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
> >有kafka 中json 数据的样例不?
> >有没有看过 TaskManager 中有没有异常 log 信息?
> >
> >
> >
> >On Tue, 28 Jul 2020 at 09:40, air23  wrote:
> >
> >> 你好 测试代码如下
> >>
> >>
> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> >> " `data` VARCHAR , " +
> >> " `table` VARCHAR " +
> >> ") WITH (" +
> >> " 'connector' = 'kafka'," +
> >> " 'topic' = 'source_databases'," +
> >> " 'properties.bootstrap.servers' = '***'," +
> >> " 'properties.group.id' = 'real1'," +
> >> " 'format' = 'json'," +
> >> " 'scan.startup.mode' = 'earliest-offset'" +
> >> ")";
> >> public static void main(String[] args) throws Exception {
> >>
> >>
> >> //bink table
> >> StreamExecutionEnvironment bsEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >> EnvironmentSettings bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >> StreamTableEnvironment bsTableEnv =
> >> StreamTableEnvironment.create(bsEnv, bsSettings);
> >>
> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
> >>
> >>
> >> tableResult.print();
> >>
> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
> >>
> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
> >>
> >> bsEnv.execute("aa");
> >>
> >> }
> >>
> >>
> >>
> >>
> >> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
> >> ,order_operation_time
> >> ,inventory_batch_log
> >> ,order_log
> >> ,order_address_book
> >> ,product_inventory
> >> ,order_physical_relation
> >> ,bil_business_attach
> >> ,picking_detail
> >> ,picking_detail
> >> ,orders
> >>
> >>
> >>
> >>
> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> >> 看到例子都是useOldPlanner 来转table的。
> >> 致谢
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
> >> >抱歉,还是没有看到附件。
> >> >如果是文本的话,你可以直接贴到邮件里。
> >> >
> >> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
> >> >
> >> >> 我再上传一次
> >> >>
> >> >> 在2020年07月27日 18:55,Jark Wu  写道:
> >> >>
> >> >> Hi,
> >> >> 你的附件好像没有上传。
> >> >>
> >> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
> >> >>
> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
> 不能取到data呢?*
> >> >> >
> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
> (\n"
> >> +
> >> >> > " `data` VARCHAR , " +
> >> >> > " `table` VARCHAR " +
> >> >> > ") WITH (" +
> >> >> > " 'connector' = 'kafka'," +
> >> >> > " 'topic' = 'order_source'," +
> >> >> > " 'properties.bootstrap.servers' = '***'," +
> >> >> > " 'properties.group.id' = 'real1'," +
> >> >> > " 'format' = 'json'," +
> >> >> > " 'scan.startup.mode' = 'earliest-offset'" +
> >> >> > ")";
> >> >> >
> >> >> >
> >> >> > 具体见附件 有打印
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >> >>
> >>
>


回复: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 明启 孙
你的flink什么版本

发送自 Windows 10 版邮件应用

发件人: air23
发送时间: 2020年7月28日 15:36
收件人: user-zh@flink.apache.org
主题: Re:Re: Re: 解析kafka的mysql binlog问题

格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来


{
"data":[
{
"op_id":"97037138",
"order_id":"84172164"
}
],
"database":"order_11",
"es":1595720375000,
"id":17469027,
"isDdl":false,
"mysqlType":{
"op_id":"int(11)",
"order_id":"int(11)"
},
"old":null,
"pkNames":[
"op_id"
],
"sql":"",
"sqlType":{
"op_id":4,
"order_id":4
},
"table":"order_product",
"ts":1595720375837,
"type":"INSERT"
}

















在 2020-07-28 14:44:35,"Jark Wu"  写道:
>有kafka 中json 数据的样例不?
>有没有看过 TaskManager 中有没有异常 log 信息?
>
>
>
>On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>
>> 你好 测试代码如下
>>
>>
>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> " `data` VARCHAR , " +
>> " `table` VARCHAR " +
>> ") WITH (" +
>> " 'connector' = 'kafka'," +
>> " 'topic' = 'source_databases'," +
>> " 'properties.bootstrap.servers' = '***'," +
>> " 'properties.group.id' = 'real1'," +
>> " 'format' = 'json'," +
>> " 'scan.startup.mode' = 'earliest-offset'" +
>> ")";
>> public static void main(String[] args) throws Exception {
>>
>>
>> //bink table
>> StreamExecutionEnvironment bsEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment bsTableEnv =
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>
>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>
>>
>> tableResult.print();
>>
>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>
>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>
>> bsEnv.execute("aa");
>>
>> }
>>
>>
>>
>>
>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>> ,order_operation_time
>> ,inventory_batch_log
>> ,order_log
>> ,order_address_book
>> ,product_inventory
>> ,order_physical_relation
>> ,bil_business_attach
>> ,picking_detail
>> ,picking_detail
>> ,orders
>>
>>
>>
>>
>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> 看到例子都是useOldPlanner 来转table的。
>> 致谢
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>> >抱歉,还是没有看到附件。
>> >如果是文本的话,你可以直接贴到邮件里。
>> >
>> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>> >
>> >> 我再上传一次
>> >>
>> >> 在2020年07月27日 18:55,Jark Wu  写道:
>> >>
>> >> Hi,
>> >> 你的附件好像没有上传。
>> >>
>> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>> >>
>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >> >
>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>> +
>> >> > " `data` VARCHAR , " +
>> >> > " `table` VARCHAR " +
>> >> > ") WITH (" +
>> >> > " 'connector' = 'kafka'," +
>> >> > " 'topic' = 'order_source'," +
>> >> > " 'properties.bootstrap.servers' = '***'," +
>> >> > " 'properties.group.id' = 'real1'," +
>> >> > " 'format' = 'json'," +
>> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>> >> > ")";
>> >> >
>> >> >
>> >> > 具体见附件 有打印
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >>
>> >>
>>



Re:Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来


{
"data":[
{
"op_id":"97037138",
"order_id":"84172164"
}
],
"database":"order_11",
"es":1595720375000,
"id":17469027,
"isDdl":false,
"mysqlType":{
"op_id":"int(11)",
"order_id":"int(11)"
},
"old":null,
"pkNames":[
"op_id"
],
"sql":"",
"sqlType":{
"op_id":4,
"order_id":4
},
"table":"order_product",
"ts":1595720375837,
"type":"INSERT"
}

















在 2020-07-28 14:44:35,"Jark Wu"  写道:
>有kafka 中json 数据的样例不?
>有没有看过 TaskManager 中有没有异常 log 信息?
>
>
>
>On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>
>> 你好 测试代码如下
>>
>>
>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> " `data` VARCHAR , " +
>> " `table` VARCHAR " +
>> ") WITH (" +
>> " 'connector' = 'kafka'," +
>> " 'topic' = 'source_databases'," +
>> " 'properties.bootstrap.servers' = '***'," +
>> " 'properties.group.id' = 'real1'," +
>> " 'format' = 'json'," +
>> " 'scan.startup.mode' = 'earliest-offset'" +
>> ")";
>> public static void main(String[] args) throws Exception {
>>
>>
>> //bink table
>> StreamExecutionEnvironment bsEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment bsTableEnv =
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>
>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>
>>
>> tableResult.print();
>>
>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>
>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>
>> bsEnv.execute("aa");
>>
>> }
>>
>>
>>
>>
>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>> ,order_operation_time
>> ,inventory_batch_log
>> ,order_log
>> ,order_address_book
>> ,product_inventory
>> ,order_physical_relation
>> ,bil_business_attach
>> ,picking_detail
>> ,picking_detail
>> ,orders
>>
>>
>>
>>
>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> 看到例子都是useOldPlanner 来转table的。
>> 致谢
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>> >抱歉,还是没有看到附件。
>> >如果是文本的话,你可以直接贴到邮件里。
>> >
>> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>> >
>> >> 我再上传一次
>> >>
>> >> 在2020年07月27日 18:55,Jark Wu  写道:
>> >>
>> >> Hi,
>> >> 你的附件好像没有上传。
>> >>
>> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>> >>
>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >> >
>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>> +
>> >> > " `data` VARCHAR , " +
>> >> > " `table` VARCHAR " +
>> >> > ") WITH (" +
>> >> > " 'connector' = 'kafka'," +
>> >> > " 'topic' = 'order_source'," +
>> >> > " 'properties.bootstrap.servers' = '***'," +
>> >> > " 'properties.group.id' = 'real1'," +
>> >> > " 'format' = 'json'," +
>> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>> >> > ")";
>> >> >
>> >> >
>> >> > 具体见附件 有打印
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >>
>> >>
>>


Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 Jark Wu
有kafka 中json 数据的样例不?
有没有看过 TaskManager 中有没有异常 log 信息?



On Tue, 28 Jul 2020 at 09:40, air23  wrote:

> 你好 测试代码如下
>
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'source_databases'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
> public static void main(String[] args) throws Exception {
>
>
> //bink table
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
>
> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>
>
> tableResult.print();
>
> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>
> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>
> bsEnv.execute("aa");
>
> }
>
>
>
>
> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
> ,order_operation_time
> ,inventory_batch_log
> ,order_log
> ,order_address_book
> ,product_inventory
> ,order_physical_relation
> ,bil_business_attach
> ,picking_detail
> ,picking_detail
> ,orders
>
>
>
>
> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> 看到例子都是useOldPlanner 来转table的。
> 致谢
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
> >抱歉,还是没有看到附件。
> >如果是文本的话,你可以直接贴到邮件里。
> >
> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
> >
> >> 我再上传一次
> >>
> >> 在2020年07月27日 18:55,Jark Wu  写道:
> >>
> >> Hi,
> >> 你的附件好像没有上传。
> >>
> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
> >>
> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
> >> >
> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
> +
> >> > " `data` VARCHAR , " +
> >> > " `table` VARCHAR " +
> >> > ") WITH (" +
> >> > " 'connector' = 'kafka'," +
> >> > " 'topic' = 'order_source'," +
> >> > " 'properties.bootstrap.servers' = '***'," +
> >> > " 'properties.group.id' = 'real1'," +
> >> > " 'format' = 'json'," +
> >> > " 'scan.startup.mode' = 'earliest-offset'" +
> >> > ")";
> >> >
> >> >
> >> > 具体见附件 有打印
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >>
>


Re:Re: 解析kafka的mysql binlog问题

2020-07-27 文章 air23
你好 测试代码如下


private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'source_databases'," +
" 'properties.bootstrap.servers' = '***'," +
" 'properties.group.id' = 'real1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +
")";
public static void main(String[] args) throws Exception {


//bink table
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);

TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);


tableResult.print();

Table table = bsTableEnv.sqlQuery("select * from kafkaTable");

bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);

bsEnv.execute("aa");

}




输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
,order_operation_time
,inventory_batch_log
,order_log
,order_address_book
,product_inventory
,order_physical_relation
,bil_business_attach
,picking_detail
,picking_detail
,orders




另外再问个问题。1.11版本 blink 不能datastream转table吗?
看到例子都是useOldPlanner 来转table的。
致谢














在 2020-07-27 19:44:10,"Jark Wu"  写道:
>抱歉,还是没有看到附件。
>如果是文本的话,你可以直接贴到邮件里。
>
>On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>
>> 我再上传一次
>>
>> 在2020年07月27日 18:55,Jark Wu  写道:
>>
>> Hi,
>> 你的附件好像没有上传。
>>
>> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>>
>> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >
>> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> > " `data` VARCHAR , " +
>> > " `table` VARCHAR " +
>> > ") WITH (" +
>> > " 'connector' = 'kafka'," +
>> > " 'topic' = 'order_source'," +
>> > " 'properties.bootstrap.servers' = '***'," +
>> > " 'properties.group.id' = 'real1'," +
>> > " 'format' = 'json'," +
>> > " 'scan.startup.mode' = 'earliest-offset'" +
>> > ")";
>> >
>> >
>> > 具体见附件 有打印
>> >
>> >
>> >
>> >
>> >
>>
>>


Re:回复:解析kafka的mysql binlog问题

2020-07-27 文章 RS
Hi,
附近应该是收不到的,包括图片啥的
只能回复纯文本,贴代码,如果真的需要图片的话,可以上传到其他的网站上,然后给个连接跳转过去





在 2020-07-27 19:21:51,"air23"  写道:

我再上传一次 


在2020年07月27日 18:55,Jark Wu 写道:
Hi,
你的附件好像没有上传。

On Mon, 27 Jul 2020 at 18:17, air23  wrote:

> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'order_source'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
>
>
> 具体见附件 有打印
>
>
>
>
>


Re: 解析kafka的mysql binlog问题

2020-07-27 文章 Jark Wu
抱歉,还是没有看到附件。
如果是文本的话,你可以直接贴到邮件里。

On Mon, 27 Jul 2020 at 19:22, air23  wrote:

> 我再上传一次
>
> 在2020年07月27日 18:55,Jark Wu  写道:
>
> Hi,
> 你的附件好像没有上传。
>
> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>
> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
> >
> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> > " `data` VARCHAR , " +
> > " `table` VARCHAR " +
> > ") WITH (" +
> > " 'connector' = 'kafka'," +
> > " 'topic' = 'order_source'," +
> > " 'properties.bootstrap.servers' = '***'," +
> > " 'properties.group.id' = 'real1'," +
> > " 'format' = 'json'," +
> > " 'scan.startup.mode' = 'earliest-offset'" +
> > ")";
> >
> >
> > 具体见附件 有打印
> >
> >
> >
> >
> >
>
>


回复:解析kafka的mysql binlog问题

2020-07-27 文章 air23
我再上传一次 


在2020年07月27日 18:55,Jark Wu 写道:
Hi,
你的附件好像没有上传。

On Mon, 27 Jul 2020 at 18:17, air23  wrote:

> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'order_source'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
>
>
> 具体见附件 有打印
>
>
>
>
>


Re: 解析kafka的mysql binlog问题

2020-07-27 文章 Jark Wu
Hi,
你的附件好像没有上传。

On Mon, 27 Jul 2020 at 18:17, air23  wrote:

> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'order_source'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
>
>
> 具体见附件 有打印
>
>
>
>
>


解析kafka的mysql binlog问题

2020-07-27 文章 air23
你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?

private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order_source'," +
" 'properties.bootstrap.servers' = '***'," +
" 'properties.group.id' = 'real1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +

")";




具体见附件 有打印