No problem, good luck with your learning journey!

Best,
Tymur Yarosh
17 квіт. 2022 р., 07:17 +0300, Marco Villalobos <mvillalo...@kineteque.com>, 
писав(-ла):
> You're right. I didn't notice that the ports were different. That was very 
> subtle.
>
> Thank you for pointing this out to me. I was stuck on it for quite a while.
>
>
> > On Apr 16, 2022, at 6:17 PM, Marco Villalobos <mvillalo...@kineteque.com> 
> > wrote:
> >
> > I'm sorry, I accidentally hit send before I was finished.
> >
> > Yes, I understand that Flink Stateful Functions run as part of a cluster in 
> > which state and throughput are managed by Flink. However, in 
> > StatefulFunctions, remote functions are still
> > implemented as remote HTTP micro-services often deployed as part of a 
> > Lambda deployment.
> >
> > However, the examples that are linked to Flink stateful functions, such as 
> > those in the Flink Stateful Function playround show HTTP as an entry point 
> > into the application.
> >
> > I can't get that to work, and I am not sure if its even correct.
> >
> > > On Apr 16, 2022, at 6:12 PM, Marco Villalobos <mvillalo...@kineteque.com> 
> > > wrote:
> > >
> > > IK
> > >
> > > If what you're saying is true, then why do most of the examples in the 
> > > flink-statefun-playground example use HTTP as an alternative entry point?
> > >
> > > Here is the greeter example:
> > >
> > > https://github.com/apache/flink-statefun-playground/tree/main/java/greeter
> > >
> > >
> > >
> > > > On Apr 16, 2022, at 6:06 PM, Tymur Yarosh <ti.yar...@gmail.com> wrote:
> > > >
> > > > Hi Marco,
> > > > The problem is that you’re trying to call the function directly via 
> > > > HTTP. This is not how it's supposed to work. Instead, check out how to 
> > > > define your statefun module 
> > > > https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/
> > > >  and how to deploy Stateful Functions 
> > > > https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/overview.
> > > >  The Stateful Functions application consists of Flink Cluster, 
> > > > ingresses to consume incoming data, egresses to produce outgoing data 
> > > > and the HTTP server that serves remote Stateful Functions. Also you can 
> > > > checkout the full example here 
> > > > https://medium.com/devoops-and-universe/part-ii-building-next-gen-event-driven-application-powered-by-stateful-functions-a3139f299736
> > > >
> > > > Best,
> > > > Tymur Yarosh
> > > > 14 квіт. 2022 р., 03:51 +0300, Marco Villalobos 
> > > > <mvillalo...@kineteque.com>, писав(-ла):
> > > > > I'm trying to write very simple echo app with Stateful Function to 
> > > > > prove it as a technology for some of my use cases.
> > > > >
> > > > > I have not been able to accept different content types though.  Here 
> > > > > is an example of my code for a simple
> > > > > echo function:
> > > > >
> > > > > My Echo stateful function class.
> > > > >
> > > > > package statefun.implementation.functions;
> > > > >
> > > > > import statefun.api.annotations.Statefun;
> > > > > import statefun.api.factories.StatefulFunctionSpecFactory;
> > > > > import com.fasterxml.jackson.databind.JsonNode;
> > > > > import org.apache.flink.statefun.sdk.java.Context;
> > > > > import org.apache.flink.statefun.sdk.java.StatefulFunction;
> > > > > import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
> > > > > import org.apache.flink.statefun.sdk.java.TypeName;
> > > > > import 
> > > > > org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
> > > > > import org.apache.flink.statefun.sdk.java.message.Message;
> > > > > import org.slf4j.Logger;
> > > > > import org.slf4j.LoggerFactory;
> > > > >
> > > > > import java.util.concurrent.CompletableFuture;
> > > > > import java.util.function.Supplier;
> > > > >
> > > > > @Statefun
> > > > > public class EchoFn implements StatefulFunction, 
> > > > > StatefulFunctionSpecFactory {
> > > > >
> > > > >   private final static Logger logger = 
> > > > > LoggerFactory.getLogger(EchoFn.class);
> > > > >
> > > > >   private final static TypeName fnType = 
> > > > > TypeName.typeNameOf("std.fns", "echo");
> > > > >
> > > > >   public final static StatefulFunctionSpec SPEC =
> > > > >         
> > > > > StatefulFunctionSpec.builder(fnType).withSupplier(EchoFn::new).build();
> > > > >
> > > > >   @Override
> > > > >   public CompletableFuture<Void> apply(Context context, Message 
> > > > > message) throws Throwable {
> > > > >
> > > > >      if (message.is(Types.JAVA_STRING_TYPE)) {
> > > > >         final String request = message.as(Types.JAVA_STRING_TYPE);
> > > > >         
> > > > > context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build());
> > > > >      } else if (message.is(Types.JSON_TYPE)) {
> > > > >         final JsonNode json = message.as(Types.JSON_TYPE);
> > > > >         final String request = json.asText();
> > > > >         
> > > > > context.send(EgressMessageBuilder.forEgress(fnType).withValue(request).build());
> > > > >      }
> > > > >
> > > > >      return context.done();
> > > > >   }
> > > > >
> > > > >   @Override
> > > > >   public StatefulFunctionSpec createSpec(Supplier<? extends 
> > > > > StatefulFunction> supplier) {
> > > > >      return StatefulFunctionSpec.builder(EchoFn.fnType)
> > > > >            .withSupplier(supplier)
> > > > >            .build();
> > > > >   }
> > > > >
> > > > >   @Override
> > > > >   public String toString() {
> > > > >      return fnType.toString();
> > > > >   }
> > > > > }
> > > > > My Types class, which kind of copies 
> > > > > https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/types/Types.java
> > > > >
> > > > > package statefun.implementation.functions;
> > > > >
> > > > > import com.fasterxml.jackson.databind.JsonNode;
> > > > > import com.fasterxml.jackson.databind.ObjectMapper;
> > > > > import org.apache.flink.statefun.sdk.java.TypeName;
> > > > > import org.apache.flink.statefun.sdk.java.types.SimpleType;
> > > > > import org.apache.flink.statefun.sdk.java.types.Type;
> > > > >
> > > > > public class Types {
> > > > >
> > > > >   public static final String JAVA_LANG_NAMESPACE = "java.lang";
> > > > >   private final static ObjectMapper objectMapper = new ObjectMapper();
> > > > >   public static final SimpleType.Fn<String, byte[]> 
> > > > > javaStringSerialize = input -> input == null ? new byte[0] : 
> > > > > input.getBytes();
> > > > >   public static final SimpleType.Fn<byte[], String> 
> > > > > javaStringDeserialize = input -> input == null || input.length == 0 ? 
> > > > > null : new String(input);
> > > > >
> > > > >   public static final Type<String> JAVA_STRING_TYPE = 
> > > > > SimpleType.simpleImmutableTypeFrom(
> > > > >         TypeName.typeNameOf(JAVA_LANG_NAMESPACE, "String"),
> > > > >         javaStringSerialize,
> > > > >         javaStringDeserialize
> > > > >   );
> > > > >
> > > > >   public static final Type<JsonNode> JSON_TYPE =
> > > > >         SimpleType.simpleImmutableTypeFrom(
> > > > >               TypeName.typeNameOf("application", 
> > > > > JsonNode.class.getName()),
> > > > >               objectMapper::writeValueAsBytes,
> > > > >               bytes -> objectMapper.readValue(bytes, JsonNode.class));
> > > > > }
> > > > >
> > > > > My Undertow Handler, copied from 
> > > > > https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/undertow/UndertowHttpHandler.java
> > > > >
> > > > > package statefun.implementation.undertow;
> > > > >
> > > > > import io.undertow.server.HttpHandler;
> > > > > import io.undertow.server.HttpServerExchange;
> > > > > import io.undertow.util.Headers;
> > > > > import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
> > > > > import org.apache.flink.statefun.sdk.java.slice.Slice;
> > > > > import org.apache.flink.statefun.sdk.java.slice.Slices;
> > > > >
> > > > > import java.util.Objects;
> > > > > import java.util.concurrent.CompletableFuture;
> > > > >
> > > > > /**
> > > > > * A simple Undertow {@link HttpHandler} that delegates requests from 
> > > > > StateFun runtime processes to
> > > > > * a StateFun {@link RequestReplyHandler}.
> > > > > */
> > > > > public final class UndertowHttpHandler implements HttpHandler {
> > > > > private final RequestReplyHandler handler;
> > > > >
> > > > > public UndertowHttpHandler(RequestReplyHandler handler) {
> > > > >    this.handler = Objects.requireNonNull(handler);
> > > > > }
> > > > >
> > > > > @Override
> > > > > public void handleRequest(HttpServerExchange exchange) {
> > > > >    
> > > > > exchange.getRequestReceiver().receiveFullBytes(this::onRequestBody);
> > > > > }
> > > > >
> > > > > private void onRequestBody(HttpServerExchange exchange, byte[] 
> > > > > requestBytes) {
> > > > >    exchange.dispatch();
> > > > >    CompletableFuture<Slice> future = 
> > > > > handler.handle(Slices.wrap(requestBytes));
> > > > >    future.whenComplete((response, exception) -> onComplete(exchange, 
> > > > > response, exception));
> > > > > }
> > > > >
> > > > > private void onComplete(HttpServerExchange exchange, Slice 
> > > > > responseBytes, Throwable ex) {
> > > > >    if (ex != null) {
> > > > >      ex.printStackTrace(System.out);
> > > > >      exchange.getResponseHeaders().put(Headers.STATUS, 500);
> > > > >      exchange.endExchange();
> > > > >      return;
> > > > >    }
> > > > >    exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, 
> > > > > "application/octet-stream");
> > > > >    
> > > > > exchange.getResponseSender().send(responseBytes.asReadOnlyByteBuffer());
> > > > > }
> > > > > }
> > > > >
> > > > > My App Server copied from 
> > > > > https://github.com/apache/flink-statefun-playground/blob/main/java/greeter/src/main/java/org/apache/flink/statefun/playground/java/greeter/GreeterAppServer.java
> > > > > package statefun;
> > > > >
> > > > > import statefun.implementation.functions.EchoFn;
> > > > > import statefun.implementation.undertow.UndertowHttpHandler;
> > > > > import io.undertow.Undertow;
> > > > > import org.apache.flink.statefun.sdk.java.StatefulFunctions;
> > > > > import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
> > > > >
> > > > >
> > > > > public final class StatefulFunctionsAppServer {
> > > > >
> > > > > public static void main(String[] args) {
> > > > >    final StatefulFunctions functions = new StatefulFunctions();
> > > > >    functions.withStatefulFunction(EchoFn.SPEC);
> > > > >
> > > > >    final RequestReplyHandler requestReplyHandler = 
> > > > > functions.requestReplyHandler();
> > > > >    final Undertow httpServer =
> > > > >        Undertow.builder()
> > > > >            .addHttpListener(8080, "0.0.0.0")
> > > > >            .setHandler(new UndertowHttpHandler(requestReplyHandler))
> > > > >            .build();
> > > > >    httpServer.start();
> > > > > }
> > > > > }
> > > > >
> > > > >
> > > > > However, when I run
> > > > >
> > > > > curl -X POST -H "Content-Type: java.lang/String" -d 'Hello World' 
> > > > > http://localhost:8080/std.fns/echo
> > > > >
> > > > > or
> > > > >
> > > > > curl -X POST -H "Content-Type: 
> > > > > application/com.fasterxml.jackson.databind.JsonNode" -d '{"name": 
> > > > > "Hello World"}' http://localhost:8080/std.fns/echo
> > > > >
> > > > > I get the following error on the server.
> > > > >
> > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException:
> > > > >  While parsing a protocol message, the input ended unexpectedly in 
> > > > > the middle of a field.  This could mean either that the input has 
> > > > > been truncated or that an embedded message misreported its own length.
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:84)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readRawLittleEndian64(CodedInputStream.java:1151)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.CodedInputStream$ArrayDecoder.readFixed64(CodedInputStream.java:767)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:503)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.GeneratedMessageV3.parseUnknownField(GeneratedMessageV3.java:298)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:65)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.<init>(ToFunction.java:14)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4090)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction$1.parsePartialFrom(ToFunction.java:4084)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:100)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:120)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:125)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.parseFrom(ToFunction.java:3666)
> > > > >       at 
> > > > > org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler.handle(ConcurrentRequestReplyHandler.java:60)
> > > > >       at 
> > > > > statefun.implementation.undertow.UndertowHttpHandler.onRequestBody(UndertowHttpHandler.java:49)
> > > > >       at 
> > > > > io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:399)
> > > > >       at 
> > > > > io.undertow.io.AsyncReceiverImpl.receiveFullBytes(AsyncReceiverImpl.java:481)
> > > > >       at 
> > > > > statefun.implementation.undertow.UndertowHttpHandler.handleRequest(UndertowHttpHandler.java:44)
> > > > >       at 
> > > > > io.undertow.server.Connectors.executeRootHandler(Connectors.java:387)
> > > > >       at 
> > > > > io.undertow.server.protocol.http.HttpReadListener.handleEventWithNoRunningRequest(HttpReadListener.java:256)
> > > > >       at 
> > > > > io.undertow.server.protocol.http.HttpReadListener.handleEvent(HttpReadListener.java:136)
> > > > >       at 
> > > > > io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:162)
> > > > >       at 
> > > > > io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:100)
> > > > >       at 
> > > > > io.undertow.server.protocol.http.HttpOpenListener.handleEvent(HttpOpenListener.java:57)
> > > > >       at 
> > > > > org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
> > > > >       at 
> > > > > org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:291)
> > > > >       at 
> > > > > org.xnio.ChannelListeners$10.handleEvent(ChannelListeners.java:286)
> > > > >       at 
> > > > > org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
> > > > >       at 
> > > > > org.xnio.nio.QueuedNioTcpServer2.acceptTask(QueuedNioTcpServer2.java:178)
> > > > >       at org.xnio.nio.WorkerThread.safeRun(WorkerThread.java:612)
> > > > >       at org.xnio.nio.WorkerThread.run(WorkerThread.java:479)
> > > > >
> > > > >
> > > > >
> > > > > I've been stuck on this for a very long time. Can somebody please 
> > > > > advise me as to what I am doing wrong? The framework never even gets 
> > > > > to the point where it parses my payload.
> > > > >
> > > > >
> > >
> >
>

Reply via email to