Hi Dian,

This is great news, really! Hope to see this soon in stable :D

Thanks a lot!

On Fri, 13 May 2022 at 15:25, Dian Fu <dian0511...@gmail.com> wrote:

> Hi Dhavan,
>
> This should be a known issue and has already been fixed in
> https://issues.apache.org/jira/browse/FLINK-27282.  However, this ticket
> was fixed recently and so it's still not released.
>
> Regards,
> Dian
>
> On Thu, May 12, 2022 at 7:26 PM Dhavan Vaidya <dhavan.vai...@kofluence.com>
> wrote:
>
>> Hey Dian,
>>
>> Though my HTTP call's response is indeed JSON, I needed to serialize data
>> into Avro for Kafka sink.
>>
>> Since you said it supports Row inside Row, I investigated deeper and
>> found out that since Row class *sorts by key names*, I had to strictly
>> follow the order in output_type parameter in `process()` function! For
>> example, if my ProcessFunction's output is defined to be following (that
>> is, it yields a structure of this type):
>>
>> Row(
>>   relatedPlaylists=Row(
>>     likes=1,
>>     uploads="1",
>>   )
>> )
>>
>> In `ds = process(MyProcessFunction(), output_type=output_type)`, the
>> output type must be:
>> Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["likes",
>> "uploads"], [Types.INT(), Types.STRING()])])
>>
>> And I cannot swap the order of "likes" and "uploads". That is, the
>> following is wrong:
>> Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["uploads",
>> "likes"], [Types.STRING(), Types.INT()])])
>>
>> After making these changes, things indeed work out well - I am able to
>> convert back to table API and sink into Kafka with Confluent Avro
>> serialization. Though I find this ordering very cumbersome :D
>>
>> On Wed, 11 May 2022 at 11:35, Dian Fu <dian0511...@gmail.com> wrote:
>>
>>> Hi Dhavan,
>>>
>>> The type of the `ds`  appearing in `t_env.from_data_stream(ds) should be
>>> known. Otherwise, it's impossible to infer the schema of the converted
>>> table, as a result, `raw` type will be used which makes the schema of the
>>> resulting table not expected. You could either declare the type of the `ds`
>>> in the DataStream API via `output_type` or declare the type via
>>> `t_env.from_data_stream(ds, schema)` when converting DataStream to table.
>>>
>>> >> 1. Must I create objects of Flink's datatypes like `
>>> pyflink.common.Row`?
>>> It doesn't necessarily have to be Row type. You could also use other
>>> types, e.g. Integer, String, etc.
>>>
>>> >> 2. What should my schema (for output_type) look like given the
>>> structure of the message: https://pastebin.com/kEFBJSBS
>>> Is the output data a JSON string? If this is the case, I guess you could
>>> just declare the output_type as Types.STRING().
>>>
>>> >> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of
>>> weird) error:
>>> >> https://pastebin.com/KpkZEHtd - here, it seems to me that having
>>> `Row` inside `Row` is causing some issues.
>>> It supports `Row` inside `Row`. Could you share an example which could
>>> reproduce this issue?
>>>
>>> Regards,
>>> Dian
>>>
>>>
>>> On Tue, May 10, 2022 at 9:09 PM Dhavan Vaidya <
>>> dhavan.vai...@kofluence.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am consuming Kafka messages with Table API connector. I cannot use
>>>> DataStream API because it does not support Confluent Avro.
>>>>
>>>> After consuming the messages, I am converting to DataStream API and
>>>> using ProcessFunction. The ProcessFunction makes async http calls and emits
>>>> results with a completely different structure. This works well. If I don't
>>>> do anything, and just do `ds.print()`,  things are printed as expected.
>>>>
>>>> The issues start happening when I convert from DataStream to Table API
>>>> again. To do this effectively and sink back to Kafka (with Confluent Avro
>>>> serialization), I am specifying `output_type` while calling `process`
>>>> on the stream. I have gathered (there might be other ways I am unaware of)
>>>> that if I don't specify the schema here, converting back to Table API (with
>>>> `t_env.from_data_stream(ds)`) makes it very difficult to serialize the
>>>> data.
>>>>
>>>> What I am not sure about are the following things:
>>>> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row
>>>> `?
>>>> 2. What should my schema (for output_type) look like given the
>>>> structure of the message: https://pastebin.com/kEFBJSBS
>>>>
>>>> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird)
>>>> error:
>>>> https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row`
>>>> inside `Row` is causing some issues.
>>>>
>>>> Thanks!
>>>>
>>>> --
>>>> Dhavan
>>>>
>>>

Reply via email to