The example uses the playground’s ingress (https://github.com/apache/flink-statefun-playground/blob/1ff449204b367e6dd0d0818ca76a5283890ce2c5/java/greeter/module.yaml#L23) that handles HTTP calls and forwards data to Flink. Then Flink calls the function over HTTP on a different server. In the example ports of functions server and the server that handles requests are different.
Best, Tymur Yarosh 17 квіт. 2022 р., 04:17 +0300, Marco Villalobos <mvillalo...@kineteque.com>, писав(-ла): > I'm sorry, I accidentally hit send before I was finished. > > Yes, I understand that Flink Stateful Functions run as part of a cluster in > which state and throughput are managed by Flink. However, in > StatefulFunctions, remote functions are still > implemented as remote HTTP micro-services often deployed as part of a Lambda > deployment. > > However, the examples that are linked to Flink stateful functions, such as > those in the Flink Stateful Function playround show HTTP as an entry point > into the application. > > I can't get that to work, and I am not sure if its even correct. > > > On Apr 16, 2022, at 6:12 PM, Marco Villalobos <mvillalo...@kineteque.com> > > wrote: > > > > IK > > > > If what you're saying is true, then why do most of the examples in the > > flink-statefun-playground example use HTTP as an alternative entry point? > > > > Here is the greeter example: > > > > https://github.com/apache/flink-statefun-playground/tree/main/java/greeter > > > > > > > > > On Apr 16, 2022, at 6:06 PM, Tymur Yarosh <ti.yar...@gmail.com> wrote: > > > > > > Hi Marco, > > > The problem is that you’re trying to call the function directly via HTTP. > > > This is not how it's supposed to work. Instead, check out how to define > > > your statefun module > > > https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/ > > > and how to deploy Stateful Functions > > > https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview. > > > The Stateful Functions application consists of Flink Cluster, ingresses > > > to consume incoming data, egresses to produce outgoing data and the HTTP > > > server that serves remote Stateful Functions. Also you can checkout the > > > full example here > > > https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736 > > > > > > Best, > > > Tymur Yarosh > > > 14 квіт. 2022 р., 03:51 +0300, Marco Villalobos > > > <mvillalo...@kineteque.com>, писав(-ла): > > > > I'm trying to write very simple echo app with Stateful Function to > > > > prove it as a technology for some of my use cases. > > > > > > > > I have not been able to accept different content types though. Here is > > > > an example of my code for a simple > > > > echo function: > > > > > > > > My Echo stateful function class. > > > > > > > > package statefun.implementation.functions; > > > > > > > > import statefun.api.annotations.Statefun; > > > > import statefun.api.factories.StatefulFunctionSpecFactory; > > > > import com.fasterxml.jackson.databind.JsonNode; > > > > import org.apache.flink.statefun.sdk.java.Context; > > > > import org.apache.flink.statefun.sdk.java.StatefulFunction; > > > > import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec; > > > > import org.apache.flink.statefun.sdk.java.TypeName; > > > > import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder; > > > > import org.apache.flink.statefun.sdk.java.message.Message; > > > > import org.slf4j.Logger; > > > > import org.slf4j.LoggerFactory; > > > > > > > > import java.util.concurrent.CompletableFuture; > > > > import java.util.function.Supplier; > > > > > > > > @Statefun > > > > public class EchoFn implements StatefulFunction, > > > > StatefulFunctionSpecFactory { > > > > > > > > private final static Logger logger = > > > > LoggerFactory.getLogger(EchoFn.class); > > > > > > > > private final static TypeName fnType = TypeName.typeNameOf("std.fns", > > > > "echo"); > > > > > > > > public final static StatefulFunctionSpec SPEC = > > > > > > > > StatefulFunctionSpec.builder(fnType).withSupplier(EchoFn::new).build(); > > > > > > > > @Override > > > > public CompletableFuture<Void> apply(Context context, Message > > > > message) throws Throwable { > > > > > > > > if (message.is(Types.JAVA_STRING_TYPE)) { > > > > final String request = message.as(Types.JAVA_STRING_TYPE); > > > > > > > > context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build()); > > > > } else if (message.is(Types.JSON_TYPE)) { > > > > final JsonNode json = message.as(Types.JSON_TYPE); > > > > final String request = json.asText(); > > > > > > > > context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build()); > > > > } > > > > > > > > return context.done(); > > > > } > > > > > > > > @Override > > > > public StatefulFunctionSpec createSpec(Supplier<? extends > > > > StatefulFunction> supplier) { > > > > return StatefulFunctionSpec.builder(EchoFn.fnType) > > > > .withSupplier(supplier) > > > > .build(); > > > > } > > > > > > > > @Override > > > > public String toString() { > > > > return fnType.toString(); > > > > } > > > > } > > > > My Types class, which kind of copies > > > > https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java > > > > > > > > package statefun.implementation.functions; > > > > > > > > import com.fasterxml.jackson.databind.JsonNode; > > > > import com.fasterxml.jackson.databind.ObjectMapper; > > > > import org.apache.flink.statefun.sdk.java.TypeName; > > > > import org.apache.flink.statefun.sdk.java.types.SimpleType; > > > > import org.apache.flink.statefun.sdk.java.types.Type; > > > > > > > > public class Types { > > > > > > > > public static final String JAVA_LANG_NAMESPACE = "java.lang"; > > > > private final static ObjectMapper objectMapper = new ObjectMapper(); > > > > public static final SimpleType.Fn<String, byte[]> javaStringSerialize > > > > = input -> input == null ? new byte[0] : input.getBytes(); > > > > public static final SimpleType.Fn<byte[], String> > > > > javaStringDeserialize = input -> input == null || input.length == 0 ? > > > > null : new String(input); > > > > > > > > public static final Type<String> JAVA_STRING_TYPE = > > > > SimpleType.simpleImmutableTypeFrom( > > > > TypeName.typeNameOf(JAVA_LANG_NAMESPACE, "String"), > > > > javaStringSerialize, > > > > javaStringDeserialize > > > > ); > > > > > > > > public static final Type<JsonNode> JSON_TYPE = > > > > SimpleType.simpleImmutableTypeFrom( > > > > TypeName.typeNameOf("application", > > > > JsonNode.class.getName()), > > > > objectMapper::writeValueAsBytes, > > > > bytes -> objectMapper.readValue(bytes, JsonNode.class)); > > > > } > > > > > > > > My Undertow Handler, copied from > > > > https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java > > > > > > > > package statefun.implementation.undertow; > > > > > > > > import io.undertow.server.HttpHandler; > > > > import io.undertow.server.HttpServerExchange; > > > > import io.undertow.util.Headers; > > > > import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler; > > > > import org.apache.flink.statefun.sdk.java.slice.Slice; > > > > import org.apache.flink.statefun.sdk.java.slice.Slices; > > > > > > > > import java.util.Objects; > > > > import java.util.concurrent.CompletableFuture; > > > > > > > > /** > > > > * A simple Undertow {@link HttpHandler} that delegates requests from > > > > StateFun runtime processes to > > > > * a StateFun {@link RequestReplyHandler}. > > > > */ > > > > public final class UndertowHttpHandler implements HttpHandler { > > > > private final RequestReplyHandler handler; > > > > > > > > public UndertowHttpHandler(RequestReplyHandler handler) { > > > > this.handler = Objects.requireNonNull(handler); > > > > } > > > > > > > > @Override > > > > public void handleRequest(HttpServerExchange exchange) { > > > > exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody); > > > > } > > > > > > > > private void onRequestBody(HttpServerExchange exchange, byte[] > > > > requestBytes) { > > > > exchange.dispatch(); > > > > CompletableFuture<Slice> future = > > > > handler.handle(Slices.wrap(requestBytes)); > > > > future.whenComplete((response, exception) -> onComplete(exchange, > > > > response, exception)); > > > > } > > > > > > > > private void onComplete(HttpServerExchange exchange, Slice > > > > responseBytes, Throwable ex) { > > > > if (ex != null) { > > > > ex.printStackTrace(System.out); > > > > exchange.getResponseHeaders().put(Headers.STATUS, 500); > > > > exchange.endExchange(); > > > > return; > > > > } > > > > exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, > > > > "application/octet-stream"); > > > > > > > > exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer()); > > > > } > > > > } > > > > > > > > My App Server copied from > > > > https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java > > > > package statefun; > > > > > > > > import statefun.implementation.functions.EchoFn; > > > > import statefun.implementation.undertow.UndertowHttpHandler; > > > > import io.undertow.Undertow; > > > > import org.apache.flink.statefun.sdk.java.StatefulFunctions; > > > > import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler; > > > > > > > > > > > > public final class StatefulFunctionsAppServer { > > > > > > > > public static void main(String[] args) { > > > > final StatefulFunctions functions = new StatefulFunctions(); > > > > functions.withStatefulFunction(EchoFn.SPEC); > > > > > > > > final RequestReplyHandler requestReplyHandler = > > > > functions.requestReplyHandler(); > > > > final Undertow httpServer = > > > > Undertow.builder() > > > > .addHttpListener(8080, "0.0.0.0") > > > > .setHandler(new UndertowHttpHandler(requestReplyHandler)) > > > > .build(); > > > > httpServer.start(); > > > > } > > > > } > > > > > > > > > > > > However, when I run > > > > > > > > curl -X POST -H "Content-Type: java.lang/String" -d 'Hello World' > > > > http://localhost:8080/std.fns/echo > > > > > > > > or > > > > > > > > curl -X POST -H "Content-Type: > > > > application/com.fasterxml.jackson.databind.JsonNode" -d '{"name": > > > > "Hello World"}' http://localhost:8080/std.fns/echo > > > > > > > > I get the following error on the server. > > > > > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException: > > > > While parsing a protocol message, the input ended unexpectedly in the > > > > middle of a field. This could mean either that the input has been > > > > truncated or that an embedded message misreported its own length. > > > > at > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84) > > > > at > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readRawLittleEndian64(CodedInputStream.java:1151) > > > > at > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readFixed64(CodedInputStream.java:767) > > > > at > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:503) > > > > at > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.GeneratedMessageV3.parseUnknownField(GeneratedMessageV3.java:298) > > > > at > > > > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:65) > > > > at > > > > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:14) > > > > at > > > > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4090) > > > > at > > > > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4084) > > > > at > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:100) > > > > at > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:120) > > > > at > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:125) > > > > at > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) > > > > at > > > > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.parseFrom(ToFunction.java:3666) > > > > at > > > > org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler.handle(ConcurrentRequestReplyHandler.java:60) > > > > at > > > > statefun.implementation.undertow.UndertowHttpHandler.onRequestBody(UndertowHttpHandler.java:49) > > > > at > > > > io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:399) > > > > at > > > > io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:481) > > > > at > > > > statefun.implementation.undertow.UndertowHttpHandler.handleRequest(UndertowHttpHandler.java:44) > > > > at > > > > io.undertow.server.Connectors.executeRootHandler(Connectors.java:387) > > > > at > > > > io.undertow.server.protocol.http.HttpReadListener.handleEventWithNoRunningRequest(HttpReadListener.java:256) > > > > at > > > > io.undertow.server.protocol.http.HttpReadListener.handleEvent(HttpReadListener.java:136) > > > > at > > > > io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:162) > > > > at > > > > io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:100) > > > > at > > > > io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:57) > > > > at > > > > org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) > > > > at > > > > org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:291) > > > > at > > > > org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:286) > > > > at > > > > org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92) > > > > at > > > > org.xnio.nio.QueuedNioTcpServer2.acceptTask(QueuedNioTcpServer2.java:178) > > > > at org.xnio.nio.WorkerThread.safeRun(WorkerThread.java:612) > > > > at org.xnio.nio.WorkerThread.run(WorkerThread.java:479) > > > > > > > > > > > > > > > > I've been stuck on this for a very long time. Can somebody please > > > > advise me as to what I am doing wrong? The framework never even gets to > > > > the point where it parses my payload. > > > > > > > > > > >