I'm sorry, I accidentally hit send before I was finished.

Yes, I understand that Flink Stateful Functions run as part of a cluster in 
which state and throughput are managed by Flink. However, in StatefulFunctions, 
remote functions are still
implemented as remote HTTP micro-services often deployed as part of a Lambda 
deployment.

However, the examples that are linked to Flink stateful functions, such as 
those in the Flink Stateful Function playround show HTTP as an entry point into 
the application.

I can't get that to work, and I am not sure if its even correct.

> On Apr 16, 2022, at 6:12 PM, Marco Villalobos <mvillalo...@kineteque.com> 
> wrote:
> 
> IK 
> 
> If what you're saying is true, then why do most of the examples in the 
> flink-statefun-playground example use HTTP as an alternative entry point?
> 
> Here is the greeter example:
> 
> https://github.com/apache/flink-statefun-playground/tree/main/java/greeter 
> <https://github.com/apache/flink-statefun-playground/tree/main/java/greeter>
> 
> 
> 
>> On Apr 16, 2022, at 6:06 PM, Tymur Yarosh <ti.yar...@gmail.com 
>> <mailto:ti.yar...@gmail.com>> wrote:
>> 
>> Hi Marco, 
>> The problem is that you’re trying to call the function directly via HTTP. 
>> This is not how it's supposed to work. Instead, check out how to define your 
>> statefun module 
>> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/
>>  
>> <https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/>
>>  and how to deploy Stateful Functions 
>> https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview
>>  
>> <https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview>.
>>  The Stateful Functions application consists of Flink Cluster, ingresses to 
>> consume incoming data, egresses to produce outgoing data and the HTTP server 
>> that serves remote Stateful Functions. Also you can checkout the full 
>> example here 
>> https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736
>>  
>> <https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736>
>> 
>> Best,
>> Tymur Yarosh
>> 14 квіт. 2022 р., 03:51 +0300, Marco Villalobos <mvillalo...@kineteque.com 
>> <mailto:mvillalo...@kineteque.com>>, писав(-ла):
>>> I'm trying to write very simple echo app with Stateful Function to prove it 
>>> as a technology for some of my use cases.
>>> 
>>> I have not been able to accept different content types though.  Here is an 
>>> example of my code for a simple
>>> echo function:
>>> 
>>> My Echo stateful function class.
>>> 
>>> package statefun.implementation.functions;
>>> 
>>> import statefun.api.annotations.Statefun;
>>> import statefun.api.factories.StatefulFunctionSpecFactory;
>>> import com.fasterxml.jackson.databind.JsonNode;
>>> import org.apache.flink.statefun.sdk.java.Context;
>>> import org.apache.flink.statefun.sdk.java.StatefulFunction;
>>> import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
>>> import org.apache.flink.statefun.sdk.java.TypeName;
>>> import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
>>> import org.apache.flink.statefun.sdk.java.message.Message;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>> 
>>> import java.util.concurrent.CompletableFuture;
>>> import java.util.function.Supplier;
>>> 
>>> @Statefun
>>> public class EchoFn implements StatefulFunction, 
>>> StatefulFunctionSpecFactory {
>>> 
>>>    private final static Logger logger = 
>>> LoggerFactory.getLogger(EchoFn.class);
>>> 
>>>    private final static TypeName fnType = TypeName.typeNameOf("std.fns", 
>>> "echo");
>>> 
>>>    public final static StatefulFunctionSpec SPEC =
>>>          
>>> StatefulFunctionSpec.builder(fnType).withSupplier(EchoFn::new).build();
>>> 
>>>    @Override
>>>    public CompletableFuture<Void> apply(Context context, Message message) 
>>> throws Throwable {
>>> 
>>>       if (message.is(Types.JAVA_STRING_TYPE)) {
>>>          final String request = message.as(Types.JAVA_STRING_TYPE);
>>>          
>>> context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build());
>>>       } else if (message.is(Types.JSON_TYPE)) {
>>>          final JsonNode json = message.as(Types.JSON_TYPE);
>>>          final String request = json.asText();
>>>          
>>> context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build());
>>>       }
>>> 
>>>       return context.done();
>>>    }
>>> 
>>>    @Override
>>>    public StatefulFunctionSpec createSpec(Supplier<? extends 
>>> StatefulFunction> supplier) {
>>>       return StatefulFunctionSpec.builder(EchoFn.fnType)
>>>             .withSupplier(supplier)
>>>             .build();
>>>    }
>>> 
>>>    @Override
>>>    public String toString() {
>>>       return fnType.toString();
>>>    }
>>> }
>>> My Types class, which kind of copies 
>>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java
>>>  
>>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java>
>>> 
>>> package statefun.implementation.functions;
>>> 
>>> import com.fasterxml.jackson.databind.JsonNode;
>>> import com.fasterxml.jackson.databind.ObjectMapper;
>>> import org.apache.flink.statefun.sdk.java.TypeName;
>>> import org.apache.flink.statefun.sdk.java.types.SimpleType;
>>> import org.apache.flink.statefun.sdk.java.types.Type;
>>> 
>>> public class Types {
>>> 
>>>    public static final String JAVA_LANG_NAMESPACE = "java.lang";
>>>    private final static ObjectMapper objectMapper = new ObjectMapper();
>>>    public static final SimpleType.Fn<String, byte[]> javaStringSerialize = 
>>> input -> input == null ? new byte[0] : input.getBytes();
>>>    public static final SimpleType.Fn<byte[], String> javaStringDeserialize 
>>> = input -> input == null || input.length == 0 ? null : new String(input);
>>>    
>>>    public static final Type<String> JAVA_STRING_TYPE = 
>>> SimpleType.simpleImmutableTypeFrom(
>>>          TypeName.typeNameOf(JAVA_LANG_NAMESPACE, "String"),
>>>          javaStringSerialize,
>>>          javaStringDeserialize
>>>    );
>>> 
>>>    public static final Type<JsonNode> JSON_TYPE =
>>>          SimpleType.simpleImmutableTypeFrom(
>>>                TypeName.typeNameOf("application", JsonNode.class.getName()),
>>>                objectMapper::writeValueAsBytes,
>>>                bytes -> objectMapper.readValue(bytes, JsonNode.class));
>>> }
>>> 
>>> My Undertow Handler, copied from 
>>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java
>>>  
>>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java>
>>> 
>>> package statefun.implementation.undertow;
>>> 
>>> import io.undertow.server.HttpHandler;
>>> import io.undertow.server.HttpServerExchange;
>>> import io.undertow.util.Headers;
>>> import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
>>> import org.apache.flink.statefun.sdk.java.slice.Slice;
>>> import org.apache.flink.statefun.sdk.java.slice.Slices;
>>> 
>>> import java.util.Objects;
>>> import java.util.concurrent.CompletableFuture;
>>> 
>>> /**
>>>  * A simple Undertow {@link HttpHandler} that delegates requests from 
>>> StateFun runtime processes to
>>>  * a StateFun {@link RequestReplyHandler}.
>>>  */
>>> public final class UndertowHttpHandler implements HttpHandler {
>>>   private final RequestReplyHandler handler;
>>> 
>>>   public UndertowHttpHandler(RequestReplyHandler handler) {
>>>     this.handler = Objects.requireNonNull(handler);
>>>   }
>>> 
>>>   @Override
>>>   public void handleRequest(HttpServerExchange exchange) {
>>>     exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody);
>>>   }
>>> 
>>>   private void onRequestBody(HttpServerExchange exchange, byte[] 
>>> requestBytes) {
>>>     exchange.dispatch();
>>>     CompletableFuture<Slice> future = 
>>> handler.handle(Slices.wrap(requestBytes));
>>>     future.whenComplete((response, exception) -> onComplete(exchange, 
>>> response, exception));
>>>   }
>>> 
>>>   private void onComplete(HttpServerExchange exchange, Slice responseBytes, 
>>> Throwable ex) {
>>>     if (ex != null) {
>>>       ex.printStackTrace(System.out);
>>>       exchange.getResponseHeaders().put(Headers.STATUS, 500);
>>>       exchange.endExchange();
>>>       return;
>>>     }
>>>     exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, 
>>> "application/octet-stream");
>>>     exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer());
>>>   }
>>> }
>>> 
>>> My App Server copied from 
>>> https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java
>>>  
>>> <https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java>
>>> package statefun;
>>> 
>>> import statefun.implementation.functions.EchoFn;
>>> import statefun.implementation.undertow.UndertowHttpHandler;
>>> import io.undertow.Undertow;
>>> import org.apache.flink.statefun.sdk.java.StatefulFunctions;
>>> import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
>>> 
>>> 
>>> public final class StatefulFunctionsAppServer {
>>> 
>>>   public static void main(String[] args) {
>>>     final StatefulFunctions functions = new StatefulFunctions();
>>>     functions.withStatefulFunction(EchoFn.SPEC);
>>> 
>>>     final RequestReplyHandler requestReplyHandler = 
>>> functions.requestReplyHandler();
>>>     final Undertow httpServer =
>>>         Undertow.builder()
>>>             .addHttpListener(8080, "0.0.0.0")
>>>             .setHandler(new UndertowHttpHandler(requestReplyHandler))
>>>             .build();
>>>     httpServer.start();
>>>   }
>>> }
>>> 
>>> 
>>> However, when I run
>>> 
>>> curl -X POST -H "Content-Type: java.lang/String" -d 'Hello World' 
>>> http://localhost:8080/std.fns/echo <http://localhost:8080/std.fns/echo>
>>> 
>>> or 
>>> 
>>> curl -X POST -H "Content-Type: 
>>> application/com.fasterxml.jackson.databind.JsonNode" -d '{"name": "Hello 
>>> World"}' http://localhost:8080/std.fns/echo 
>>> <http://localhost:8080/std.fns/echo> 
>>> 
>>> I get the following error on the server.  
>>> 
>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException:
>>>  While parsing a protocol message, the input ended unexpectedly in the 
>>> middle of a field.  This could mean either that the input has been 
>>> truncated or that an embedded message misreported its own length.
>>>     at 
>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84)
>>>     at 
>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readRawLittleEndian64(CodedInputStream.java:1151)
>>>     at 
>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readFixed64(CodedInputStream.java:767)
>>>     at 
>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:503)
>>>     at 
>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.GeneratedMessageV3.parseUnknownField(GeneratedMessageV3.java:298)
>>>     at 
>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:65)
>>>     at 
>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:14)
>>>     at 
>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4090)
>>>     at 
>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4084)
>>>     at 
>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:100)
>>>     at 
>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:120)
>>>     at 
>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:125)
>>>     at 
>>> org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
>>>     at 
>>> org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.parseFrom(ToFunction.java:3666)
>>>     at 
>>> org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler.handle(ConcurrentRequestReplyHandler.java:60)
>>>     at 
>>> statefun.implementation.undertow.UndertowHttpHandler.onRequestBody(UndertowHttpHandler.java:49)
>>>     at 
>>> io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:399)
>>>     at 
>>> io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:481)
>>>     at 
>>> statefun.implementation.undertow.UndertowHttpHandler.handleRequest(UndertowHttpHandler.java:44)
>>>     at io.undertow.server.Connectors.executeRootHandler(Connectors.java:387)
>>>     at 
>>> io.undertow.server.protocol.http.HttpReadListener.handleEventWithNoRunningRequest(HttpReadListener.java:256)
>>>     at 
>>> io.undertow.server.protocol.http.HttpReadListener.handleEvent(HttpReadListener.java:136)
>>>     at 
>>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:162)
>>>     at 
>>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:100)
>>>     at 
>>> io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:57)
>>>     at 
>>> org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
>>>     at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:291)
>>>     at org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:286)
>>>     at 
>>> org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
>>>     at 
>>> org.xnio.nio.QueuedNioTcpServer2.acceptTask(QueuedNioTcpServer2.java:178)
>>>     at org.xnio.nio.WorkerThread.safeRun(WorkerThread.java:612)
>>>     at org.xnio.nio.WorkerThread.run(WorkerThread.java:479)
>>> 
>>> 
>>> 
>>> I've been stuck on this for a very long time. Can somebody please advise me 
>>> as to what I am doing wrong? The framework never even gets to the point 
>>> where it parses my payload.
>>> 
>>> 
> 

Reply via email to