Hello, I’m trying to send a message from DataStream API to a remote Stateful 
Function (Java SDK). The DataStream job:


    public static final FunctionType DEVICE = new 
FunctionType("com.github.f1xman.era.anomalydetection.device", "DeviceFunction");

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        StatefulFunctionsConfig statefunConfig = 
StatefulFunctionsConfig.fromEnvironment(env);
        statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);

        DataStreamSource<String> names = env.addSource(new 
NamesSourceFunction());
        DataStream<RoutableMessage> namesIngress = names.map(name -> 
RoutableMessageBuilder.builder()
                .withTargetAddress(DEVICE, name)
                .withMessageBody(name)
                .build());
        StatefulFunctionDataStreamBuilder.builder("example")
                .withDataStreamAsIngress(namesIngress)
                .withRequestReplyRemoteFunction(
                        requestReplyFunctionBuilder(DEVICE, 
URI.create("http://localhost:8080/statefun";))
                )
                .withConfiguration(statefunConfig)
                .build(env);

        env.execute("Flink Streaming Java API Skeleton");
    }


When String value passed to .withMessageBody(...) the following exception 
occurred:


java.lang.ClassCastException: java.lang.String cannot be cast to 
org.apache.flink.statefun.sdk.reqreply.generated.TypedValue
    at 
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:118)
    at 
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
    ... 25 more


Though, sending a String value to an embedded function works well. The 
workaround I've found is to wrap the value with TypedValue:

                .withMessageBody(TypedValue.newBuilder()
                        .setValue(ByteString.copyFrom(name, 
StandardCharsets.UTF_8))
                        .setHasValue(true)
                        .setTypename("example/Name")
                        .build()
                )

This approach requires the receiver function to unwrap the TypedValue and 
deserialize the ByteString. It looks too low-level for this kind of API. I 
believe this is the wrong usage of Stateful Function's SDK for Flink DataStream 
Integration. What is the correct way to implement Flink DataStream and remote 
Stateful Functions interoperability?

The job is inspired by the official examples 
https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/sdk/flink-datastream/

Best,
Tymur Yarosh

Reply via email to