Hi Igal, I just wanted to give you an update on my deployment of stateful functions to an existing Flink cluster. The good news is that it works now when I submit my config with the statefun-flink-distribution. Thank you very much for your help. There was one gotcha and that was down to the requirement to update the flink.yaml to include: classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
As the flink.yaml is bundled in the newly created fat jar, there is no easy way to edit that. My hacky solution was to code around that and create a new entrypoint class, which replicated the code in org.apache.flink.statefun.flink.core.StatefulFunctionsJob without the validation code that was stopping my deployment. It may be easier if the flink.yaml in the statefun distribution dependency were shipped with the required value in it by default? Thanks a million, Barry On 2021/09/10 12:37:32, Igal Shilman <i...@apache.org> wrote: > Hello Barry, > > I assume that by "we don't need another installation of Flink to manage the > stateful functions." You mean that you already have a running Flink cluster > and you would like to submit an additional Flink Job that executes a > Stateful functions application? > > Then perhaps just try to submit [1] to the flink cluster. In addition you > would have to make the module.yaml available in the class path. > You can simply place it under your flink distribution /lib directory, or > alternatively: > 1. create a jar-with-dependencies (uber jar / fat jar) that has [1] as a > dependency, > 2. Add the module.yaml definition under src/resources. > 3. Make sure that the main class will > be org.apache.flink.statefun.flink.core.StatefulFunctionsJob > You should be able to submit it either the web interface, or by running: > > ./bin/flink run -c > org.apache.flink.statefun.flink.core.StatefulFunctionsJob > ./statefun-example.jar > > If this approach doesn't work for you, let me know and we will figure out > the DataStream integration approach. > > All the best, > Igal. > > [1] > https://mvnrepository.com/artifact/org.apache.flink/statefun-flink-distribution/3.1.0 > > > > On Thu, Sep 9, 2021 at 5:22 PM Barry Higgins <barry.p.higgi...@gmail.com> > wrote: > > > Hi, > > I'm looing at using the DataStream API from a Flink application against a > > remote python stateful function deployed on another machine. I would like > > to investigate how feasible it is to have all of the state management being > > handled from the calling side meaning that we don't need another > > installation of Flink to manage the stateful functions. > > > > Unfortunately the example referenced in the documentation: > > https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/ > > > > is no longer in existence: > > > > https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java > > > > There is an older version that is available here: > > > > https://github.com/apache/flink-statefun/tree/release-2.2/statefun-examples/statefun-flink-datastream-example > > > > and I have tried to work with this without much success > > > > The calling element of the code looks as follows: > > > > StatefulFunctionEgressStreams out = > > StatefulFunctionDataStreamBuilder.builder("example") > > .withDataStreamAsIngress(names) > > .withFunctionProvider(GREET, unused -> new MyFunction()) > > .withRequestReplyRemoteFunction( > > requestReplyFunctionBuilder( > > REMOTE_GREET, URI.create(" > > http://localhost:5000/statefun")) > > .withPersistedState("seen_count") > > .withMaxRequestDuration(Duration.ofSeconds(15)) > > .withMaxNumBatchRequests(500)) > > .withEgressId(GREETINGS) > > .withConfiguration(statefunConfig) > > .build(env); > > > > with a reference to a FunctionProvider that exists as an inner class in > > the same class. We would like this to be a remote call, where I guess I > > would replace http://localhost:5000/statefun with the remote address of > > the SF. > > However when I do make such a change the code is still referring to the > > inner function and any changes to the local MyFunction class are returned > > regardless of what is deployed remotely. > > > > If anyone has a working example of how to interact via DataStreams with a > > remotely deployed SF, I would be very grateful. I would be very happy to > > update the documentation if I can get this working. > > Cheers, > > Barry > > > > >