Hi Barry! Glad to hear that it works for you! I just didn't understand: a) what is "flink.yaml" perhaps you are referring to "flink-conf.yaml"? b) why is it bundled with the distribution jar? I couldn't find it there (nor it should be there) I've looked manually by: jar tf statefun-flink-distribution-3.1.0.jar | grep "\.yaml" couldn't find it there.
Generally flink-conf.yaml should be part of your Flink runtime. For example a file at /opt/flink/conf/flink-conf.yaml Thanks, Igal. On Thu, Sep 23, 2021 at 11:22 AM Barry Higgins <barry.p.higgi...@gmail.com> wrote: > Hi Igal, > I just wanted to give you an update on my deployment of stateful functions > to an existing Flink cluster. > The good news is that it works now when I submit my config with the > statefun-flink-distribution. Thank you very much for your help. > There was one gotcha and that was down to the requirement to update the > flink.yaml to include: > classloader.parent-first-patterns.additional: > org.apache.flink.statefun;org.apache.kafka;com.google.protobuf > > As the flink.yaml is bundled in the newly created fat jar, there is no > easy way to edit that. My hacky solution was to code around that and create > a new entrypoint class, which replicated the code in > org.apache.flink.statefun.flink.core.StatefulFunctionsJob without the > validation code that was stopping my deployment. > It may be easier if the flink.yaml in the statefun distribution dependency > were shipped with the required value in it by default? > Thanks a million, > Barry > > On 2021/09/10 12:37:32, Igal Shilman <i...@apache.org> wrote: > > Hello Barry, > > > > I assume that by "we don't need another installation of Flink to manage > the > > stateful functions." You mean that you already have a running Flink > cluster > > and you would like to submit an additional Flink Job that executes a > > Stateful functions application? > > > > Then perhaps just try to submit [1] to the flink cluster. In addition you > > would have to make the module.yaml available in the class path. > > You can simply place it under your flink distribution /lib directory, or > > alternatively: > > 1. create a jar-with-dependencies (uber jar / fat jar) that has [1] as a > > dependency, > > 2. Add the module.yaml definition under src/resources. > > 3. Make sure that the main class will > > be org.apache.flink.statefun.flink.core.StatefulFunctionsJob > > You should be able to submit it either the web interface, or by running: > > > > ./bin/flink run -c > > org.apache.flink.statefun.flink.core.StatefulFunctionsJob > > ./statefun-example.jar > > > > If this approach doesn't work for you, let me know and we will figure out > > the DataStream integration approach. > > > > All the best, > > Igal. > > > > [1] > > > https://mvnrepository.com/artifact/org.apache.flink/statefun-flink-distribution/3.1.0 > > > > > > > > On Thu, Sep 9, 2021 at 5:22 PM Barry Higgins <barry.p.higgi...@gmail.com > > > > wrote: > > > > > Hi, > > > I'm looing at using the DataStream API from a Flink application > against a > > > remote python stateful function deployed on another machine. I would > like > > > to investigate how feasible it is to have all of the state management > being > > > handled from the calling side meaning that we don't need another > > > installation of Flink to manage the stateful functions. > > > > > > Unfortunately the example referenced in the documentation: > > > > https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/sdk/flink-datastream/ > > > > > > is no longer in existence: > > > > > > > https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java > > > > > > There is an older version that is available here: > > > > > > > https://github.com/apache/flink-statefun/tree/release-2.2/statefun-examples/statefun-flink-datastream-example > > > > > > and I have tried to work with this without much success > > > > > > The calling element of the code looks as follows: > > > > > > StatefulFunctionEgressStreams out = > > > StatefulFunctionDataStreamBuilder.builder("example") > > > .withDataStreamAsIngress(names) > > > .withFunctionProvider(GREET, unused -> new MyFunction()) > > > .withRequestReplyRemoteFunction( > > > requestReplyFunctionBuilder( > > > REMOTE_GREET, URI.create(" > > > http://localhost:5000/statefun")) > > > .withPersistedState("seen_count") > > > .withMaxRequestDuration(Duration.ofSeconds(15)) > > > .withMaxNumBatchRequests(500)) > > > .withEgressId(GREETINGS) > > > .withConfiguration(statefunConfig) > > > .build(env); > > > > > > with a reference to a FunctionProvider that exists as an inner class in > > > the same class. We would like this to be a remote call, where I guess I > > > would replace http://localhost:5000/statefun with the remote address > of > > > the SF. > > > However when I do make such a change the code is still referring to the > > > inner function and any changes to the local MyFunction class are > returned > > > regardless of what is deployed remotely. > > > > > > If anyone has a working example of how to interact via DataStreams > with a > > > remotely deployed SF, I would be very grateful. I would be very happy > to > > > update the documentation if I can get this working. > > > Cheers, > > > Barry > > > > > > > > >