Hello, I’m trying to send a message from DataStream API to a remote Stateful Function (Java SDK). The DataStream job:
public static final FunctionType DEVICE = new FunctionType("com.github.f1xman.era.anomalydetection.device", "DeviceFunction"); public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StatefulFunctionsConfig statefunConfig = StatefulFunctionsConfig.fromEnvironment(env); statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS); DataStreamSource<String> names = env.addSource(new NamesSourceFunction()); DataStream<RoutableMessage> namesIngress = names.map(name -> RoutableMessageBuilder.builder() .withTargetAddress(DEVICE, name) .withMessageBody(name) .build()); StatefulFunctionDataStreamBuilder.builder("example") .withDataStreamAsIngress(namesIngress) .withRequestReplyRemoteFunction( requestReplyFunctionBuilder(DEVICE, URI.create("http://localhost:8080/statefun")) ) .withConfiguration(statefunConfig) .build(env); env.execute("Flink Streaming Java API Skeleton"); } When String value passed to .withMessageBody(...) the following exception occurred: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.flink.statefun.sdk.reqreply.generated.TypedValue at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:118) at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48) ... 25 more Though, sending a String value to an embedded function works well. The workaround I've found is to wrap the value with TypedValue: .withMessageBody(TypedValue.newBuilder() .setValue(ByteString.copyFrom(name, StandardCharsets.UTF_8)) .setHasValue(true) .setTypename("example/Name") .build() ) This approach requires the receiver function to unwrap the TypedValue and deserialize the ByteString. It looks too low-level for this kind of API. I believe this is the wrong usage of Stateful Function's SDK for Flink DataStream Integration. What is the correct way to implement Flink DataStream and remote Stateful Functions interoperability? The job is inspired by the official examples https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/sdk/flink-datastream/ Best, Tymur Yarosh