Hi Dian, This is great news, really! Hope to see this soon in stable :D
Thanks a lot! On Fri, 13 May 2022 at 15:25, Dian Fu <dian0511...@gmail.com> wrote: > Hi Dhavan, > > This should be a known issue and has already been fixed in > https://issues.apache.org/jira/browse/FLINK-27282. However, this ticket > was fixed recently and so it's still not released. > > Regards, > Dian > > On Thu, May 12, 2022 at 7:26 PM Dhavan Vaidya <dhavan.vai...@kofluence.com> > wrote: > >> Hey Dian, >> >> Though my HTTP call's response is indeed JSON, I needed to serialize data >> into Avro for Kafka sink. >> >> Since you said it supports Row inside Row, I investigated deeper and >> found out that since Row class *sorts by key names*, I had to strictly >> follow the order in output_type parameter in `process()` function! For >> example, if my ProcessFunction's output is defined to be following (that >> is, it yields a structure of this type): >> >> Row( >> relatedPlaylists=Row( >> likes=1, >> uploads="1", >> ) >> ) >> >> In `ds = process(MyProcessFunction(), output_type=output_type)`, the >> output type must be: >> Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["likes", >> "uploads"], [Types.INT(), Types.STRING()])]) >> >> And I cannot swap the order of "likes" and "uploads". That is, the >> following is wrong: >> Types.ROW_NAMED(["relatedPlaylists"], [Types.ROW_NAMED(["uploads", >> "likes"], [Types.STRING(), Types.INT()])]) >> >> After making these changes, things indeed work out well - I am able to >> convert back to table API and sink into Kafka with Confluent Avro >> serialization. Though I find this ordering very cumbersome :D >> >> On Wed, 11 May 2022 at 11:35, Dian Fu <dian0511...@gmail.com> wrote: >> >>> Hi Dhavan, >>> >>> The type of the `ds` appearing in `t_env.from_data_stream(ds) should be >>> known. Otherwise, it's impossible to infer the schema of the converted >>> table, as a result, `raw` type will be used which makes the schema of the >>> resulting table not expected. You could either declare the type of the `ds` >>> in the DataStream API via `output_type` or declare the type via >>> `t_env.from_data_stream(ds, schema)` when converting DataStream to table. >>> >>> >> 1. Must I create objects of Flink's datatypes like ` >>> pyflink.common.Row`? >>> It doesn't necessarily have to be Row type. You could also use other >>> types, e.g. Integer, String, etc. >>> >>> >> 2. What should my schema (for output_type) look like given the >>> structure of the message: https://pastebin.com/kEFBJSBS >>> Is the output data a JSON string? If this is the case, I guess you could >>> just declare the output_type as Types.STRING(). >>> >>> >> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of >>> weird) error: >>> >> https://pastebin.com/KpkZEHtd - here, it seems to me that having >>> `Row` inside `Row` is causing some issues. >>> It supports `Row` inside `Row`. Could you share an example which could >>> reproduce this issue? >>> >>> Regards, >>> Dian >>> >>> >>> On Tue, May 10, 2022 at 9:09 PM Dhavan Vaidya < >>> dhavan.vai...@kofluence.com> wrote: >>> >>>> Hello, >>>> >>>> I am consuming Kafka messages with Table API connector. I cannot use >>>> DataStream API because it does not support Confluent Avro. >>>> >>>> After consuming the messages, I am converting to DataStream API and >>>> using ProcessFunction. The ProcessFunction makes async http calls and emits >>>> results with a completely different structure. This works well. If I don't >>>> do anything, and just do `ds.print()`, things are printed as expected. >>>> >>>> The issues start happening when I convert from DataStream to Table API >>>> again. To do this effectively and sink back to Kafka (with Confluent Avro >>>> serialization), I am specifying `output_type` while calling `process` >>>> on the stream. I have gathered (there might be other ways I am unaware of) >>>> that if I don't specify the schema here, converting back to Table API (with >>>> `t_env.from_data_stream(ds)`) makes it very difficult to serialize the >>>> data. >>>> >>>> What I am not sure about are the following things: >>>> 1. Must I create objects of Flink's datatypes like `pyflink.common.Row >>>> `? >>>> 2. What should my schema (for output_type) look like given the >>>> structure of the message: https://pastebin.com/kEFBJSBS >>>> >>>> I tried https://pastebin.com/EPT0pB85 but I am getting (kind of weird) >>>> error: >>>> https://pastebin.com/KpkZEHtd - here, it seems to me that having `Row` >>>> inside `Row` is causing some issues. >>>> >>>> Thanks! >>>> >>>> -- >>>> Dhavan >>>> >>>