Hello,

I've found a ticket that talks about very high-level improvements to the Table 
API [1]. Are there any more concrete pointers for migration from DataSet to 
Table API? Will it be possible at all to use POJOs with the Table API?

[1] https://issues.apache.org/jira/browse/FLINK-20787

Regards,
Alexis.

From: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
Sent: Donnerstag, 5. August 2021 15:49
To: user@flink.apache.org
Subject: Using POJOs with the table API

Hi everyone,

I had been using the DataSet API until now, but since that's been deprecated, I 
started looking into the Table API. In my DataSet job I have a lot of POJOs, 
all of which are even annotated with @TypeInfo and provide the corresponding 
factories. The Table API documentation talks about POJOs here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#user-defined-data-types

I started with a single UDF to try it out (an AggregateFunction), but I have 
encountered several issues.

My accumulator class is a (serializable) POJO, but initially there were 
validation exceptions because the specified class is abstract. I added this to 
the class and got over that:

@FunctionHint(
        accumulator = DataTypeHint("RAW", bridgedTo = MyAccumulator.class)
)

Then there were exceptions about the output type. Since it's also a POJO, I 
thought this would help:

@FunctionHint(
        accumulator = DataTypeHint("RAW", bridgedTo = MyAccumulator::class),
        output = DataTypeHint("RAW", bridgedTo =  MyDTO.class)
)

But no luck: org.apache.flink.table.api.TableException: Unsupported conversion 
from data type 'RAW('com.MyDTO', '...')' (conversion class: com.MyDTO) to type 
information. Only data types that originated from type information fully 
support a reverse conversion.

I figured I would try something simpler and first return a List<String> from my 
AggregateFunction. But how do I define that in a DataTypeHint? I'm not sure if 
that's documented, but I looked through LogicalTypeParser and used:

output = DataTypeHint("ARRAY<STRING>")

But that throws an exception (see attachment): Table program cannot be 
compiled. This is a bug. Please file an issue.

I changed the List<String> to String[] and that finally worked.

Even getting a simple test running was difficult. I simply could not get 
TableEnvironment#fromValues to work with POJOs as input, and I tried many 
combinations of DataTypes.of(MyPojo.class)

At this point I still don't know how to return complex data structures 
encapsulated in POJOs from my UDF. Am I missing something very essential?

Regards,
Alexis.

Reply via email to