Hi,
I've managed to use a customized egress connector in Flink Statefun but
encountered some errors.
I have implemented the following code.
File:ModuleWithSinkSpec.java
import java.util.Map;
import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
public class ModuleWithSinkSpec implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> globalConfiguration,
Binder binder) {
System.out.println("StatefulFunctionModule");
EgressIdentifier<TypedValue> id = new
EgressIdentifier<>("e-commerce.fns", "custom-sink", TypedValue.class);
EgressSpec<TypedValue> spec = new SinkFunctionSpec<>(id, new
PrintSinkFunction()
);
binder.bindEgress(spec);
}
}
File PrintSinkFunction.java
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
public class PrintSinkFunction implements SinkFunction<TypedValue> {
@Override
public void invoke(TypedValue value, Context context) throws Exception {
System.out.println("PrintSinkFunction: " + value);
}
}
File:ModuleWithSinkSpec.java
import static Marketplace.Types.Messages.USER_LOGIN_JSON_TYPE;
import Marketplace.Types.Entity.TmpUserLogin;
import Marketplace.Constant.Constants;
import org.apache.flink.statefun.sdk.java.*;
import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
import org.apache.flink.statefun.sdk.java.message.Message;
import java.util.concurrent.CompletableFuture;
public class TempUserLoginFn implements StatefulFunction{
static final TypeName TYPE =
TypeName.typeNameOf(Constants.FUNS_NAMESPACE, "login");
static final ValueSpec<Integer> SEEN =
ValueSpec.named("seen").withIntType();
static final TypeName CUSTOM_EGRESS =
TypeName.typeNameOf("e-commerce.fns", "custom-sink");
public static final StatefulFunctionSpec SPEC =
StatefulFunctionSpec.builder(TYPE)
.withValueSpec(SEEN)
.withSupplier(TempUserLoginFn::new)
.build();
@Override
public CompletableFuture<Void> apply(Context context, Message message){
if (message.is(USER_LOGIN_JSON_TYPE)){
final TmpUserLogin tmpUserLogin = message.as(USER_LOGIN_JSON_TYPE);
int seen = context.storage().get(SEEN).orElse(0);
context.storage().set(SEEN, seen + 1);
context.send(
EgressMessageBuilder.forEgress(CUSTOM_EGRESS)
.withValue("Hello " +
tmpUserLogin.getUsername() + " for the " + seen + "th time!")
.build());
return context.done();
}
}
Also, I using a file mentioning the Module in the META-INF/services
directory:
The file is located at
src/main/resources/META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule
Marketplace.Funs.ModuleWithSinkSpec
Furthermore, I packaged the ModuleWithSinkSpec.java and
PrintSinkFunction.java along with their dependencies into an
EmbedFlink-1.0-SNAPSHOT.jar file and inserted it into the StateFun runtime
image.
statefun:
image: apache/flink-statefun-playground:3.2.0-1.0
ports:
- "8081:8081"
- "8090:8090"
- "8091:8091"
volumes:
- ./module.yaml:/module.yaml
-
./EmbedFlink-1.0-SNAPSHOT.jar:/opt/statefun/modules/my-embedded/EmbedFlink-1.0-SNAPSHOT.jar
But when I run the application and send events to ingress, an error occurs:
e-commerce-master-statefun-1 | 11:24:18,595 WARN
org.apache.flink.runtime.taskmanager.Task -
feedback-union -> functions -> Sink: e-commerce.fns-kafkaSink-egress (4
/8)#0 (b6bf4b29507bf0d86bf23a6e3a30bd82) switched from RUNNING to FAILED
with failure cause:
org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException:
An error
occurred when attempting to invoke function FunctionType(e-commerce.fns,
login).
master-statefun-1 | at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50)
master-statefun-1 | at
org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:74)
master-statefun-1 | at
org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:60)
master-statefun-1 | at
org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164)
master-statefun-1 | at
org.apache.flink.statefun.flink.core.functions.AsyncSink.drainOnOperatorThread(AsyncSink.java:119)
master-statefun-1 | at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
master-statefun-1 | at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
master-statefun-1 | at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
master-statefun-1 | at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
master-statefun-1 | at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
master-statefun-1 | at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
master-statefun-1 | at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
master-statefun-1 | at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
master-statefun-1 | at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
master-statefun-1 | at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
master-statefun-1 | at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
master-statefun-1 | at
java.base/java.lang.Thread.run(Thread.java:829)
master-statefun-1 | Caused by: java.lang.IllegalArgumentException:
Unknown egress EgressKey(e-commerce.fns, custom-sink, class
org.apache.flink.statefun.sdk.reqreply.generate
d.TypedValue)
master-statefun-1 | at
org.apache.flink.statefun.flink.core.functions.SideOutputSink.accept(SideOutputSink.java:44)
master-statefun-1 | at
org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:99)
master-statefun-1 | at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.handleEgressMessages(RequestReplyFunction.java:248)
master-statefun-1 | at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.handleInvocationResultResponse(RequestReplyFunction.java:207)
master-statefun-1 | at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:178)
master-statefun-1 | at
org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:124)
master-statefun-1 | at
org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
master-statefun-1 | ... 16 more
I would greatly appreciate it if you could provide me with some suggestions.
Best wishes