直接转成string1.11版本还不支持,会在1.12修复,参考jira[1]

[1]https://issues.apache.org/jira/browse/FLINK-18002 
<https://issues.apache.org/jira/browse/FLINK-18002>

> 2020年7月28日 下午5:20,air23 <wangfei23_...@163.com> 写道:
> 
> 你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致
> 另外想请教下 1.11 版本  datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的,
> 但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-28 16:02:18,"Jark Wu" <imj...@gmail.com> 写道:
>> 因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
>> 1.12 中已经支持读取复杂结构为 string 类型了。
>> 
>> Best,
>> Jark
>> 
>> On Tue, 28 Jul 2020 at 15:36, air23 <wangfei23_...@163.com> wrote:
>> 
>>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>> 
>>> 
>>> {
>>>    "data":[
>>>        {
>>>            "op_id":"97037138",
>>>            "order_id":"84172164"
>>>        }
>>>    ],
>>>    "database":"order_11",
>>>    "es":1595720375000,
>>>    "id":17469027,
>>>    "isDdl":false,
>>>    "mysqlType":{
>>>        "op_id":"int(11)",
>>>        "order_id":"int(11)"
>>>    },
>>>    "old":null,
>>>    "pkNames":[
>>>        "op_id"
>>>    ],
>>>    "sql":"",
>>>    "sqlType":{
>>>        "op_id":4,
>>>        "order_id":4
>>>    },
>>>    "table":"order_product",
>>>    "ts":1595720375837,
>>>    "type":"INSERT"
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-07-28 14:44:35,"Jark Wu" <imj...@gmail.com> 写道:
>>>> 有kafka 中json 数据的样例不?
>>>> 有没有看过 TaskManager 中有没有异常 log 信息?
>>>> 
>>>> 
>>>> 
>>>> On Tue, 28 Jul 2020 at 09:40, air23 <wangfei23_...@163.com> wrote:
>>>> 
>>>>> 你好 测试代码如下
>>>>> 
>>>>> 
>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>>>> " `data` VARCHAR , " +
>>>>> " `table` VARCHAR " +
>>>>> ") WITH (" +
>>>>> " 'connector' = 'kafka'," +
>>>>> " 'topic' = 'source_databases'," +
>>>>> " 'properties.bootstrap.servers' = '***'," +
>>>>> " 'properties.group.id' = 'real1'," +
>>>>> " 'format' = 'json'," +
>>>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>>>> ")";
>>>>> public static void main(String[] args) throws Exception {
>>>>> 
>>>>> 
>>>>> //bink table
>>>>> StreamExecutionEnvironment bsEnv =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>    EnvironmentSettings bsSettings =
>>>>> 
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>>    StreamTableEnvironment bsTableEnv =
>>>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>>> 
>>>>>    TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>>> 
>>>>> 
>>>>> tableResult.print();
>>>>> 
>>>>>    Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>>> 
>>>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>>> 
>>>>> bsEnv.execute("aa");
>>>>> 
>>>>> }
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>>>>> ,order_operation_time
>>>>> ,inventory_batch_log
>>>>> ,order_log
>>>>> ,order_address_book
>>>>> ,product_inventory
>>>>> ,order_physical_relation
>>>>> ,bil_business_attach
>>>>> ,picking_detail
>>>>> ,picking_detail
>>>>> ,orders
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>>>> 看到例子都是useOldPlanner 来转table的。
>>>>> 致谢
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道:
>>>>>> 抱歉,还是没有看到附件。
>>>>>> 如果是文本的话,你可以直接贴到邮件里。
>>>>>> 
>>>>>> On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote:
>>>>>> 
>>>>>>> 我再上传一次
>>>>>>> 
>>>>>>> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道:
>>>>>>> 
>>>>>>> Hi,
>>>>>>> 你的附件好像没有上传。
>>>>>>> 
>>>>>>> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote:
>>>>>>> 
>>>>>>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
>>> 不能取到data呢?*
>>>>>>>> 
>>>>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
>>> (\n"
>>>>> +
>>>>>>>>        " `data` VARCHAR , " +
>>>>>>>>        " `table` VARCHAR " +
>>>>>>>>        ") WITH (" +
>>>>>>>>        " 'connector' = 'kafka'," +
>>>>>>>>        " 'topic' = 'order_source'," +
>>>>>>>>        " 'properties.bootstrap.servers' = '***'," +
>>>>>>>>        " 'properties.group.id' = 'real1'," +
>>>>>>>>        " 'format' = 'json'," +
>>>>>>>>        " 'scan.startup.mode' = 'earliest-offset'" +
>>>>>>>>        ")";
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 具体见附件 有打印
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>> 

回复