Hi Dhavan, This should be a known issue and has already been fixed in https://issues.apache.org/jira/browse/FLINK-27282. However, this ticket was fixed recently and so it's still not released.
Regards, Dian On Thu, May 12, 2022 at 7:26 PM Dhavan Vaidya <dhavan.vai...@kofluence.com> wrote: > Hey Dian, > > Though my HTTP call's response is indeed JSON, I needed to serialize data > into Avro for Kafka sink. > > Since you said it supports Row inside Row, I investigated deeper and found > out that since Row class *sorts by key names*, I had to strictly follow > the order in output_type parameter in `process()` function! For example, > if my ProcessFunction's output is defined to be following (that is, it > yields a structure of this type): > > Row( > relatedPlaylists=Row( > likes=1, > uploads="1", > ) > ) > > In `ds = process(MyProcessFunction(), output_type=output_type)`, the > output type must be: > Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["likes", > "uploads"], [Types.INT(), Types.STRING()])]) > > And I cannot swap the order of "likes" and "uploads". That is, the > following is wrong: > Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["uploads", > "likes"], [Types.STRING(), Types.INT()])]) > > After making these changes, things indeed work out well - I am able to > convert back to table API and sink into Kafka with Confluent Avro > serialization. Though I find this ordering very cumbersome :D > > On Wed, 11 May 2022 at 11:35, Dian Fu <dian0511...@gmail.com> wrote: > >> Hi Dhavan, >> >> The type of the `ds` appearing in `t_env.from_data_stream(ds) should be >> known. Otherwise, it's impossible to infer the schema of the converted >> table, as a result, `raw` type will be used which makes the schema of the >> resulting table not expected. You could either declare the type of the `ds` >> in the DataStream API via `output_type` or declare the type via >> `t_env.from_data_stream(ds, schema)` when converting DataStream to table. >> >> >> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row >> `? >> It doesn't necessarily have to be Row type. You could also use other >> types, e.g. Integer, String, etc. >> >> >> 2. What should my schema (for output_type) look like given the >> structure of the message: https://pastebin.com/kEFBJSBS >> Is the output data a JSON string? If this is the case, I guess you could >> just declare the output_type as Types.STRING(). >> >> >> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of >> weird) error: >> >> https://pastebin.com/KpkZEHtd - here, it seems to me that having >> `Row` inside `Row` is causing some issues. >> It supports `Row` inside `Row`. Could you share an example which could >> reproduce this issue? >> >> Regards, >> Dian >> >> >> On Tue, May 10, 2022 at 9:09 PM Dhavan Vaidya < >> dhavan.vai...@kofluence.com> wrote: >> >>> Hello, >>> >>> I am consuming Kafka messages with Table API connector. I cannot use >>> DataStream API because it does not support Confluent Avro. >>> >>> After consuming the messages, I am converting to DataStream API and >>> using ProcessFunction. The ProcessFunction makes async http calls and emits >>> results with a completely different structure. This works well. If I don't >>> do anything, and just do `ds.print()`, things are printed as expected. >>> >>> The issues start happening when I convert from DataStream to Table API >>> again. To do this effectively and sink back to Kafka (with Confluent Avro >>> serialization), I am specifying `output_type` while calling `process` >>> on the stream. I have gathered (there might be other ways I am unaware of) >>> that if I don't specify the schema here, converting back to Table API (with >>> `t_env.from_data_stream(ds)`) makes it very difficult to serialize the >>> data. >>> >>> What I am not sure about are the following things: >>> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row`? >>> 2. What should my schema (for output_type) look like given the structure >>> of the message: https://pastebin.com/kEFBJSBS >>> >>> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird) >>> error: >>> https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row` >>> inside `Row` is causing some issues. >>> >>> Thanks! >>> >>> -- >>> Dhavan >>> >>