Hello, I've found a ticket that talks about very high-level improvements to the Table API [1]. Are there any more concrete pointers for migration from DataSet to Table API? Will it be possible at all to use POJOs with the Table API?
[1] https://issues.apache.org/jira/browse/FLINK-20787 Regards, Alexis. From: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com> Sent: Donnerstag, 5. August 2021 15:49 To: user@flink.apache.org Subject: Using POJOs with the table API Hi everyone, I had been using the DataSet API until now, but since that's been deprecated, I started looking into the Table API. In my DataSet job I have a lot of POJOs, all of which are even annotated with @TypeInfo and provide the corresponding factories. The Table API documentation talks about POJOs here: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#user-defined-data-types I started with a single UDF to try it out (an AggregateFunction), but I have encountered several issues. My accumulator class is a (serializable) POJO, but initially there were validation exceptions because the specified class is abstract. I added this to the class and got over that: @FunctionHint( accumulator = DataTypeHint("RAW", bridgedTo = MyAccumulator.class) ) Then there were exceptions about the output type. Since it's also a POJO, I thought this would help: @FunctionHint( accumulator = DataTypeHint("RAW", bridgedTo = MyAccumulator::class), output = DataTypeHint("RAW", bridgedTo = MyDTO.class) ) But no luck: org.apache.flink.table.api.TableException: Unsupported conversion from data type 'RAW('com.MyDTO', '...')' (conversion class: com.MyDTO) to type information. Only data types that originated from type information fully support a reverse conversion. I figured I would try something simpler and first return a List<String> from my AggregateFunction. But how do I define that in a DataTypeHint? I'm not sure if that's documented, but I looked through LogicalTypeParser and used: output = DataTypeHint("ARRAY<STRING>") But that throws an exception (see attachment): Table program cannot be compiled. This is a bug. Please file an issue. I changed the List<String> to String[] and that finally worked. Even getting a simple test running was difficult. I simply could not get TableEnvironment#fromValues to work with POJOs as input, and I tried many combinations of DataTypes.of(MyPojo.class) At this point I still don't know how to return complex data structures encapsulated in POJOs from my UDF. Am I missing something very essential? Regards, Alexis.