Hello!

I'm trying to figure out whether Flink Statefun supports sending object
with class that has generic parameter types (and potentially nested types).
For example, I send a message that looks like this:

context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject,
listOfLongObject, Long));

And obviously I'm getting complaints like this:

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract
TypeInformation from Class alone, because generic parameters are missing.
Please use TypeInformation.of(TypeHint) instead, or another equivalent
method in the API that accepts a TypeHint instead of a Class. For example
for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
at
org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214)
at
org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at
org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49)
at
org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100)
at
org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54)
at org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39)
at
org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42)
at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
at
org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:75)
at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:52)
at
org.apache.flink.statefun.flink.core.functions.LocalSink.accept(LocalSink.java:36)
at
org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:92)
at org.apache.flink.statefun.sdk.Context.send(Context.java:88)
at
benchmark.HotItemsPersisted$ParseEventFunction.invoke(HotItemsPersisted.java:292)
at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)


Is there any API function that statefun support for parameterized class
like this or does the user function need to handle the serialization
process -- or is there anyway to quickly modify statefun message interface
to support this functionality.

Thanks!

Le

Reply via email to