Hi, I've managed to use a customized egress connector in Flink Statefun but encountered some errors.
I have implemented the following code. File:ModuleWithSinkSpec.java import java.util.Map; import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec; import org.apache.flink.statefun.sdk.io.EgressIdentifier; import org.apache.flink.statefun.sdk.io.EgressSpec; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; public class ModuleWithSinkSpec implements StatefulFunctionModule { @Override public void configure(Map<String, String> globalConfiguration, Binder binder) { System.out.println("StatefulFunctionModule"); EgressIdentifier<TypedValue> id = new EgressIdentifier<>("e-commerce.fns", "custom-sink", TypedValue.class); EgressSpec<TypedValue> spec = new SinkFunctionSpec<>(id, new PrintSinkFunction() ); binder.bindEgress(spec); } } File PrintSinkFunction.java import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; import org.apache.flink.streaming.api.functions.sink.SinkFunction; public class PrintSinkFunction implements SinkFunction<TypedValue> { @Override public void invoke(TypedValue value, Context context) throws Exception { System.out.println("PrintSinkFunction: " + value); } } File:ModuleWithSinkSpec.java import static Marketplace.Types.Messages.USER_LOGIN_JSON_TYPE; import Marketplace.Types.Entity.TmpUserLogin; import Marketplace.Constant.Constants; import org.apache.flink.statefun.sdk.java.*; import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder; import org.apache.flink.statefun.sdk.java.message.Message; import java.util.concurrent.CompletableFuture; public class TempUserLoginFn implements StatefulFunction{ static final TypeName TYPE = TypeName.typeNameOf(Constants.FUNS_NAMESPACE, "login"); static final ValueSpec<Integer> SEEN = ValueSpec.named("seen").withIntType(); static final TypeName CUSTOM_EGRESS = TypeName.typeNameOf("e-commerce.fns", "custom-sink"); public static final StatefulFunctionSpec SPEC = StatefulFunctionSpec.builder(TYPE) .withValueSpec(SEEN) .withSupplier(TempUserLoginFn::new) .build(); @Override public CompletableFuture<Void> apply(Context context, Message message){ if (message.is(USER_LOGIN_JSON_TYPE)){ final TmpUserLogin tmpUserLogin = message.as(USER_LOGIN_JSON_TYPE); int seen = context.storage().get(SEEN).orElse(0); context.storage().set(SEEN, seen + 1); context.send( EgressMessageBuilder.forEgress(CUSTOM_EGRESS) .withValue("Hello " + tmpUserLogin.getUsername() + " for the " + seen + "th time!") .build()); return context.done(); } } Also, I using a file mentioning the Module in the META-INF/services directory: The file is located at src/main/resources/META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule Marketplace.Funs.ModuleWithSinkSpec Furthermore, I packaged the ModuleWithSinkSpec.java and PrintSinkFunction.java along with their dependencies into an EmbedFlink-1.0-SNAPSHOT.jar file and inserted it into the StateFun runtime image. statefun: image: apache/flink-statefun-playground:3.2.0-1.0 ports: - "8081:8081" - "8090:8090" - "8091:8091" volumes: - ./module.yaml:/module.yaml - ./EmbedFlink-1.0-SNAPSHOT.jar:/opt/statefun/modules/my-embedded/EmbedFlink-1.0-SNAPSHOT.jar But when I run the application and send events to ingress, an error occurs: e-commerce-master-statefun-1 | 11:24:18,595 WARN org.apache.flink.runtime.taskmanager.Task - feedback-union -> functions -> Sink: e-commerce.fns-kafkaSink-egress (4 /8)#0 (b6bf4b29507bf0d86bf23a6e3a30bd82) switched from RUNNING to FAILED with failure cause: org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException: An error occurred when attempting to invoke function FunctionType(e-commerce.fns, login). master-statefun-1 | at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50) master-statefun-1 | at org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:74) master-statefun-1 | at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:60) master-statefun-1 | at org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164) master-statefun-1 | at org.apache.flink.statefun.flink.core.functions.AsyncSink.drainOnOperatorThread(AsyncSink.java:119) master-statefun-1 | at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) master-statefun-1 | at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) master-statefun-1 | at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) master-statefun-1 | at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) master-statefun-1 | at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) master-statefun-1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) master-statefun-1 | at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) master-statefun-1 | at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) master-statefun-1 | at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) master-statefun-1 | at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) master-statefun-1 | at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) master-statefun-1 | at java.base/java.lang.Thread.run(Thread.java:829) master-statefun-1 | Caused by: java.lang.IllegalArgumentException: Unknown egress EgressKey(e-commerce.fns, custom-sink, class org.apache.flink.statefun.sdk.reqreply.generate d.TypedValue) master-statefun-1 | at org.apache.flink.statefun.flink.core.functions.SideOutputSink.accept(SideOutputSink.java:44) master-statefun-1 | at org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:99) master-statefun-1 | at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.handleEgressMessages(RequestReplyFunction.java:248) master-statefun-1 | at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.handleInvocationResultResponse(RequestReplyFunction.java:207) master-statefun-1 | at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:178) master-statefun-1 | at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:124) master-statefun-1 | at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48) master-statefun-1 | ... 16 more I would greatly appreciate it if you could provide me with some suggestions. Best wishes