I'm sorry, I accidentally hit send before I was finished. Yes, I understand that Flink Stateful Functions run as part of a cluster in which state and throughput are managed by Flink. However, in StatefulFunctions, remote functions are still implemented as remote HTTP micro-services often deployed as part of a Lambda deployment.
However, the examples that are linked to Flink stateful functions, such as those in the Flink Stateful Function playround show HTTP as an entry point into the application. I can't get that to work, and I am not sure if its even correct. > On Apr 16, 2022, at 6:12 PM, Marco Villalobos <mvillalo...@kineteque.com> > wrote: > > IK > > If what you're saying is true, then why do most of the examples in the > flink-statefun-playground example use HTTP as an alternative entry point? > > Here is the greeter example: > > https://github.com/apache/flink-statefun-playground/tree/main/java/greeter > <https://github.com/apache/flink-statefun-playground/tree/main/java/greeter> > > > >> On Apr 16, 2022, at 6:06 PM, Tymur Yarosh <ti.yar...@gmail.com >> <mailto:ti.yar...@gmail.com>> wrote: >> >> Hi Marco, >> The problem is that you’re trying to call the function directly via HTTP. >> This is not how it's supposed to work. Instead, check out how to define your >> statefun module >> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/ >> >> <https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/> >> and how to deploy Stateful Functions >> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview >> >> <https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview>. >> The Stateful Functions application consists of Flink Cluster, ingresses to >> consume incoming data, egresses to produce outgoing data and the HTTP server >> that serves remote Stateful Functions. Also you can checkout the full >> example here >> https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736 >> >> <https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736> >> >> Best, >> Tymur Yarosh >> 14 квіт. 2022 р., 03:51 +0300, Marco Villalobos <mvillalo...@kineteque.com >> <mailto:mvillalo...@kineteque.com>>, писав(-ла): >>> I'm trying to write very simple echo app with Stateful Function to prove it >>> as a technology for some of my use cases. >>> >>> I have not been able to accept different content types though. Here is an >>> example of my code for a simple >>> echo function: >>> >>> My Echo stateful function class. >>> >>> package statefun.implementation.functions; >>> >>> import statefun.api.annotations.Statefun; >>> import statefun.api.factories.StatefulFunctionSpecFactory; >>> import com.fasterxml.jackson.databind.JsonNode; >>> import org.apache.flink.statefun.sdk.java.Context; >>> import org.apache.flink.statefun.sdk.java.StatefulFunction; >>> import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec; >>> import org.apache.flink.statefun.sdk.java.TypeName; >>> import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder; >>> import org.apache.flink.statefun.sdk.java.message.Message; >>> import org.slf4j.Logger; >>> import org.slf4j.LoggerFactory; >>> >>> import java.util.concurrent.CompletableFuture; >>> import java.util.function.Supplier; >>> >>> @Statefun >>> public class EchoFn implements StatefulFunction, >>> StatefulFunctionSpecFactory { >>> >>> private final static Logger logger = >>> LoggerFactory.getLogger(EchoFn.class); >>> >>> private final static TypeName fnType = TypeName.typeNameOf("std.fns", >>> "echo"); >>> >>> public final static StatefulFunctionSpec SPEC = >>> >>> StatefulFunctionSpec.builder(fnType).withSupplier(EchoFn::new).build(); >>> >>> @Override >>> public CompletableFuture<Void> apply(Context context, Message message) >>> throws Throwable { >>> >>> if (message.is(Types.JAVA_STRING_TYPE)) { >>> final String request = message.as(Types.JAVA_STRING_TYPE); >>> >>> context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build()); >>> } else if (message.is(Types.JSON_TYPE)) { >>> final JsonNode json = message.as(Types.JSON_TYPE); >>> final String request = json.asText(); >>> >>> context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build()); >>> } >>> >>> return context.done(); >>> } >>> >>> @Override >>> public StatefulFunctionSpec createSpec(Supplier<? extends >>> StatefulFunction> supplier) { >>> return StatefulFunctionSpec.builder(EchoFn.fnType) >>> .withSupplier(supplier) >>> .build(); >>> } >>> >>> @Override >>> public String toString() { >>> return fnType.toString(); >>> } >>> } >>> My Types class, which kind of copies >>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java >>> >>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java> >>> >>> package statefun.implementation.functions; >>> >>> import com.fasterxml.jackson.databind.JsonNode; >>> import com.fasterxml.jackson.databind.ObjectMapper; >>> import org.apache.flink.statefun.sdk.java.TypeName; >>> import org.apache.flink.statefun.sdk.java.types.SimpleType; >>> import org.apache.flink.statefun.sdk.java.types.Type; >>> >>> public class Types { >>> >>> public static final String JAVA_LANG_NAMESPACE = "java.lang"; >>> private final static ObjectMapper objectMapper = new ObjectMapper(); >>> public static final SimpleType.Fn<String, byte[]> javaStringSerialize = >>> input -> input == null ? new byte[0] : input.getBytes(); >>> public static final SimpleType.Fn<byte[], String> javaStringDeserialize >>> = input -> input == null || input.length == 0 ? null : new String(input); >>> >>> public static final Type<String> JAVA_STRING_TYPE = >>> SimpleType.simpleImmutableTypeFrom( >>> TypeName.typeNameOf(JAVA_LANG_NAMESPACE, "String"), >>> javaStringSerialize, >>> javaStringDeserialize >>> ); >>> >>> public static final Type<JsonNode> JSON_TYPE = >>> SimpleType.simpleImmutableTypeFrom( >>> TypeName.typeNameOf("application", JsonNode.class.getName()), >>> objectMapper::writeValueAsBytes, >>> bytes -> objectMapper.readValue(bytes, JsonNode.class)); >>> } >>> >>> My Undertow Handler, copied from >>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java >>> >>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java> >>> >>> package statefun.implementation.undertow; >>> >>> import io.undertow.server.HttpHandler; >>> import io.undertow.server.HttpServerExchange; >>> import io.undertow.util.Headers; >>> import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler; >>> import org.apache.flink.statefun.sdk.java.slice.Slice; >>> import org.apache.flink.statefun.sdk.java.slice.Slices; >>> >>> import java.util.Objects; >>> import java.util.concurrent.CompletableFuture; >>> >>> /** >>> * A simple Undertow {@link HttpHandler} that delegates requests from >>> StateFun runtime processes to >>> * a StateFun {@link RequestReplyHandler}. >>> */ >>> public final class UndertowHttpHandler implements HttpHandler { >>> private final RequestReplyHandler handler; >>> >>> public UndertowHttpHandler(RequestReplyHandler handler) { >>> this.handler = Objects.requireNonNull(handler); >>> } >>> >>> @Override >>> public void handleRequest(HttpServerExchange exchange) { >>> exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody); >>> } >>> >>> private void onRequestBody(HttpServerExchange exchange, byte[] >>> requestBytes) { >>> exchange.dispatch(); >>> CompletableFuture<Slice> future = >>> handler.handle(Slices.wrap(requestBytes)); >>> future.whenComplete((response, exception) -> onComplete(exchange, >>> response, exception)); >>> } >>> >>> private void onComplete(HttpServerExchange exchange, Slice responseBytes, >>> Throwable ex) { >>> if (ex != null) { >>> ex.printStackTrace(System.out); >>> exchange.getResponseHeaders().put(Headers.STATUS, 500); >>> exchange.endExchange(); >>> return; >>> } >>> exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, >>> "application/octet-stream"); >>> exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer()); >>> } >>> } >>> >>> My App Server copied from >>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java >>> >>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java> >>> package statefun; >>> >>> import statefun.implementation.functions.EchoFn; >>> import statefun.implementation.undertow.UndertowHttpHandler; >>> import io.undertow.Undertow; >>> import org.apache.flink.statefun.sdk.java.StatefulFunctions; >>> import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler; >>> >>> >>> public final class StatefulFunctionsAppServer { >>> >>> public static void main(String[] args) { >>> final StatefulFunctions functions = new StatefulFunctions(); >>> functions.withStatefulFunction(EchoFn.SPEC); >>> >>> final RequestReplyHandler requestReplyHandler = >>> functions.requestReplyHandler(); >>> final Undertow httpServer = >>> Undertow.builder() >>> .addHttpListener(8080, "0.0.0.0") >>> .setHandler(new UndertowHttpHandler(requestReplyHandler)) >>> .build(); >>> httpServer.start(); >>> } >>> } >>> >>> >>> However, when I run >>> >>> curl -X POST -H "Content-Type: java.lang/String" -d 'Hello World' >>> http://localhost:8080/std.fns/echo <http://localhost:8080/std.fns/echo> >>> >>> or >>> >>> curl -X POST -H "Content-Type: >>> application/com.fasterxml.jackson.databind.JsonNode" -d '{"name": "Hello >>> World"}' http://localhost:8080/std.fns/echo >>> <http://localhost:8080/std.fns/echo> >>> >>> I get the following error on the server. >>> >>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException: >>> While parsing a protocol message, the input ended unexpectedly in the >>> middle of a field. This could mean either that the input has been >>> truncated or that an embedded message misreported its own length. >>> at >>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84) >>> at >>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readRawLittleEndian64(CodedInputStream.java:1151) >>> at >>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readFixed64(CodedInputStream.java:767) >>> at >>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:503) >>> at >>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.GeneratedMessageV3.parseUnknownField(GeneratedMessageV3.java:298) >>> at >>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:65) >>> at >>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:14) >>> at >>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4090) >>> at >>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4084) >>> at >>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:100) >>> at >>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:120) >>> at >>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:125) >>> at >>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) >>> at >>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.parseFrom(ToFunction.java:3666) >>> at >>> org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler.handle(ConcurrentRequestReplyHandler.java:60) >>> at >>> statefun.implementation.undertow.UndertowHttpHandler.onRequestBody(UndertowHttpHandler.java:49) >>> at >>> io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:399) >>> at >>> io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:481) >>> at >>> statefun.implementation.undertow.UndertowHttpHandler.handleRequest(UndertowHttpHandler.java:44) >>> at io.undertow.server.Connectors.executeRootHandler(Connectors.java:387) >>> at >>> io.undertow.server.protocol.http.HttpReadListener.handleEventWithNoRunningRequest(HttpReadListener.java:256) >>> at >>> io.undertow.server.protocol.http.HttpReadListener.handleEvent(HttpReadListener.java:136) >>> at >>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:162) >>> at >>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:100) >>> at >>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:57) >>> at >>> org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) >>> at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:291) >>> at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:286) >>> at >>> org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) >>> at >>> org.xnio.nio.QueuedNioTcpServer2.acceptTask(QueuedNioTcpServer2.java:178) >>> at org.xnio.nio.WorkerThread.safeRun(WorkerThread.java:612) >>> at org.xnio.nio.WorkerThread.run(WorkerThread.java:479) >>> >>> >>> >>> I've been stuck on this for a very long time. Can somebody please advise me >>> as to what I am doing wrong? The framework never even gets to the point >>> where it parses my payload. >>> >>> >