Hi Dhavan,

This should be a known issue and has already been fixed in
https://issues.apache.org/jira/browse/FLINK-27282.  However, this ticket
was fixed recently and so it's still not released.

Regards,
Dian

On Thu, May 12, 2022 at 7:26 PM Dhavan Vaidya <dhavan.vai...@kofluence.com>
wrote:

> Hey Dian,
>
> Though my HTTP call's response is indeed JSON, I needed to serialize data
> into Avro for Kafka sink.
>
> Since you said it supports Row inside Row, I investigated deeper and found
> out that since Row class *sorts by key names*, I had to strictly follow
> the order in output_type parameter in `process()` function! For example,
> if my ProcessFunction's output is defined to be following (that is, it
> yields a structure of this type):
>
> Row(
>   relatedPlaylists=Row(
>     likes=1,
>     uploads="1",
>   )
> )
>
> In `ds = process(MyProcessFunction(), output_type=output_type)`, the
> output type must be:
> Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["likes",
> "uploads"], [Types.INT(), Types.STRING()])])
>
> And I cannot swap the order of "likes" and "uploads". That is, the
> following is wrong:
> Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["uploads",
> "likes"], [Types.STRING(), Types.INT()])])
>
> After making these changes, things indeed work out well - I am able to
> convert back to table API and sink into Kafka with Confluent Avro
> serialization. Though I find this ordering very cumbersome :D
>
> On Wed, 11 May 2022 at 11:35, Dian Fu <dian0511...@gmail.com> wrote:
>
>> Hi Dhavan,
>>
>> The type of the `ds`  appearing in `t_env.from_data_stream(ds) should be
>> known. Otherwise, it's impossible to infer the schema of the converted
>> table, as a result, `raw` type will be used which makes the schema of the
>> resulting table not expected. You could either declare the type of the `ds`
>> in the DataStream API via `output_type` or declare the type via
>> `t_env.from_data_stream(ds, schema)` when converting DataStream to table.
>>
>> >> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row
>> `?
>> It doesn't necessarily have to be Row type. You could also use other
>> types, e.g. Integer, String, etc.
>>
>> >> 2. What should my schema (for output_type) look like given the
>> structure of the message: https://pastebin.com/kEFBJSBS
>> Is the output data a JSON string? If this is the case, I guess you could
>> just declare the output_type as Types.STRING().
>>
>> >> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of
>> weird) error:
>> >> https://pastebin.com/KpkZEHtd - here, it seems to me that having
>> `Row` inside `Row` is causing some issues.
>> It supports `Row` inside `Row`. Could you share an example which could
>> reproduce this issue?
>>
>> Regards,
>> Dian
>>
>>
>> On Tue, May 10, 2022 at 9:09 PM Dhavan Vaidya <
>> dhavan.vai...@kofluence.com> wrote:
>>
>>> Hello,
>>>
>>> I am consuming Kafka messages with Table API connector. I cannot use
>>> DataStream API because it does not support Confluent Avro.
>>>
>>> After consuming the messages, I am converting to DataStream API and
>>> using ProcessFunction. The ProcessFunction makes async http calls and emits
>>> results with a completely different structure. This works well. If I don't
>>> do anything, and just do `ds.print()`,  things are printed as expected.
>>>
>>> The issues start happening when I convert from DataStream to Table API
>>> again. To do this effectively and sink back to Kafka (with Confluent Avro
>>> serialization), I am specifying `output_type` while calling `process`
>>> on the stream. I have gathered (there might be other ways I am unaware of)
>>> that if I don't specify the schema here, converting back to Table API (with
>>> `t_env.from_data_stream(ds)`) makes it very difficult to serialize the
>>> data.
>>>
>>> What I am not sure about are the following things:
>>> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row`?
>>> 2. What should my schema (for output_type) look like given the structure
>>> of the message: https://pastebin.com/kEFBJSBS
>>>
>>> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird)
>>> error:
>>> https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row`
>>> inside `Row` is causing some issues.
>>>
>>> Thanks!
>>>
>>> --
>>> Dhavan
>>>
>>

Reply via email to