Hi,

I've managed to use a customized egress connector in Flink Statefun but
encountered some errors.

I have implemented the following code.

File:ModuleWithSinkSpec.java


import java.util.Map;
import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;

public class ModuleWithSinkSpec implements StatefulFunctionModule {

    @Override
    public void configure(Map<String, String> globalConfiguration,
Binder binder) {
        System.out.println("StatefulFunctionModule");
        EgressIdentifier<TypedValue> id = new
EgressIdentifier<>("e-commerce.fns", "custom-sink", TypedValue.class);
        EgressSpec<TypedValue> spec = new SinkFunctionSpec<>(id, new
PrintSinkFunction()
        );
        binder.bindEgress(spec);
    }
}


File PrintSinkFunction.java

import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class PrintSinkFunction implements SinkFunction<TypedValue> {
    @Override
    public void invoke(TypedValue value, Context context) throws Exception {
        System.out.println("PrintSinkFunction: " + value);
    }
}


File:ModuleWithSinkSpec.java

import static Marketplace.Types.Messages.USER_LOGIN_JSON_TYPE;
import Marketplace.Types.Entity.TmpUserLogin;
import Marketplace.Constant.Constants;

import org.apache.flink.statefun.sdk.java.*;
import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
import org.apache.flink.statefun.sdk.java.message.Message;

import java.util.concurrent.CompletableFuture;

public class TempUserLoginFn implements StatefulFunction{

    static final TypeName TYPE =
TypeName.typeNameOf(Constants.FUNS_NAMESPACE, "login");

    static final ValueSpec<Integer> SEEN =
ValueSpec.named("seen").withIntType();

    static final TypeName CUSTOM_EGRESS =
TypeName.typeNameOf("e-commerce.fns", "custom-sink");

    public static final StatefulFunctionSpec SPEC =
StatefulFunctionSpec.builder(TYPE)
            .withValueSpec(SEEN)
            .withSupplier(TempUserLoginFn::new)
            .build();

    @Override
    public CompletableFuture<Void> apply(Context context, Message message){
        if (message.is(USER_LOGIN_JSON_TYPE)){
            final TmpUserLogin tmpUserLogin = message.as(USER_LOGIN_JSON_TYPE);

            int seen = context.storage().get(SEEN).orElse(0);
            context.storage().set(SEEN, seen + 1);
            context.send(
                    EgressMessageBuilder.forEgress(CUSTOM_EGRESS)
                            .withValue("Hello " +
tmpUserLogin.getUsername() + " for the " + seen + "th time!")
                            .build());
        return context.done();
    }
}


Also, I using a file mentioning the Module in the META-INF/services
directory:
The file is located at
src/main/resources/META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule

Marketplace.Funs.ModuleWithSinkSpec

Furthermore, I packaged the  ModuleWithSinkSpec.java and
PrintSinkFunction.java along with their dependencies into an
EmbedFlink-1.0-SNAPSHOT.jar file and inserted it into the StateFun runtime
image.

  statefun:
    image: apache/flink-statefun-playground:3.2.0-1.0
    ports:
      - "8081:8081"
      - "8090:8090"
      - "8091:8091"
    volumes:
      - ./module.yaml:/module.yaml
      - 
./EmbedFlink-1.0-SNAPSHOT.jar:/opt/statefun/modules/my-embedded/EmbedFlink-1.0-SNAPSHOT.jar




But when I run the application and send events to ingress, an error occurs:

e-commerce-master-statefun-1  | 11:24:18,595 WARN
 org.apache.flink.runtime.taskmanager.Task                     -
feedback-union -> functions -> Sink: e-commerce.fns-kafkaSink-egress (4
/8)#0 (b6bf4b29507bf0d86bf23a6e3a30bd82) switched from RUNNING to FAILED
with failure cause:
org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException:
An error
occurred when attempting to invoke function FunctionType(e-commerce.fns,
login).
master-statefun-1  |         at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50)
master-statefun-1  |         at
org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:74)
master-statefun-1  |         at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:60)
master-statefun-1  |         at
org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164)
master-statefun-1  |         at
org.apache.flink.statefun.flink.core.functions.AsyncSink.drainOnOperatorThread(AsyncSink.java:119)
master-statefun-1  |         at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
master-statefun-1  |         at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
master-statefun-1  |         at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
master-statefun-1  |         at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
master-statefun-1  |         at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
master-statefun-1  |         at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
master-statefun-1  |         at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
master-statefun-1  |         at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
master-statefun-1  |         at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
master-statefun-1  |         at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
master-statefun-1  |         at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
master-statefun-1  |         at
java.base/java.lang.Thread.run(Thread.java:829)
master-statefun-1  |   Caused by: java.lang.IllegalArgumentException:
Unknown egress EgressKey(e-commerce.fns, custom-sink, class
org.apache.flink.statefun.sdk.reqreply.generate
d.TypedValue)
master-statefun-1  |           at
org.apache.flink.statefun.flink.core.functions.SideOutputSink.accept(SideOutputSink.java:44)
master-statefun-1  |           at
org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:99)
master-statefun-1  |             at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.handleEgressMessages(RequestReplyFunction.java:248)
master-statefun-1  |             at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.handleInvocationResultResponse(RequestReplyFunction.java:207)
master-statefun-1  |             at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:178)
master-statefun-1  |             at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:124)
master-statefun-1  |             at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
master-statefun-1  |             ... 16 more

I would greatly appreciate it if you could provide me with some suggestions.

Best wishes

Reply via email to