How do you deploy the job currently? Are you using the data stream integration / or as a Flink Jar [1]
(also please note, that the directories might be created but without checkpoint interval set, they will be empty) Regarding your two questions: That is true that you can theoretically share the same cluster to submit additional jobs besides StateFun. statefun requires a specific set of configurations, that might not apply for your other jobs. Considering your end-goal of eventually using kubernetes, the recommended way is actually using a cluster per job, and StateFun docker images are a convenient way to package your modules. [1] https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#flink-jar On Thu, Nov 5, 2020 at 5:29 PM Jan Brusch <jan.bru...@neuland-bfi.de> wrote: > Hi Igal, > > thanks for your quick and detailed reply! For me, this is the really great > defining feature of Stateful Functions: Separating StreamProcessing > "Infrastructure" from Business Logic Code, possibly maintained by a > different team. > > Regarding your points: I did add the checkpoint interval to the flink-conf > to to avail. state.checkpoint.dir was already set and all the necessary > subfolders get created on job startup. They just stay empty... > > Thanks for the pointer to the helm charts! Just what I was looking for! > > A question regarding StateFun docker images: I would actually prefer using > them but my fear is that they would take away the my options to: > > 1) deploy a new release of my StateFun job without killing the cluster, > because... > > 2) ... I would like to schedule regular flink jobs or additional StateFun > jobs on the same cluster alongside my original job. > > Could you give a quick opinion if these fears are even true and if so, > what would be a recommended setup to satisfy these use cases? > > > Best regards > > Jan > > > On 05.11.20 17:02, Igal Shilman wrote: > > Hi Jan, > > The architecture outlined by you, sounds good and we've run successfully > mixed architectures like this. > Let me try to address your questions: > > 1) > To enable checkpointing you need to set the relevant values in your > flink-conf.yaml file. > execution.checkpointing.interval: <duration> (see [1]) > state.checkpoint.dir: <path> (see [2]) > > You can take a look here for an example [3]. The easiest way to > incorporate the changes would be to add your custom flink-conf.yaml into > your docker image (here is an example [4]). > When you will be using kubernetes, you can mount a config map as a > flink-conf.yaml, check out the helm charts here: [5] > > 2) > When the remote function is unavailable, StateFun would buffer the > messages addressed to it, upto the specified > timeout (default would be 1 minute, you can set it here [6]) before the > job is considered to be failed and it would be restarted. > It seems like in your example you are waiting for 10 seconds, so the > messages should be delivered. > Do you set function.spec.timeout or .withMaxRequestDuration() to something > else? > > > Good luck! > Igal. > > p.s, > Consider using StateFun docker images[7], see any of the examples in the > statefun repository. > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-checkpointing-interval > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-checkpoints-dir > [3] > https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml > [4] > https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile#L20 > [5] https://github.com/apache/flink-statefun/tree/master/tools/k8s > [6] look for function.spec.timeout at > https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/index.html > [7] > https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#images > > On Thu, Nov 5, 2020 at 3:35 PM Jan Brusch <jan.bru...@neuland-bfi.de> > wrote: > >> Hi, >> >> I'm currently trying to set up a Flink Stateful Functions Job with the >> following architecture: >> >> * Kinesis Ingress (embedded) >> >> * Stateful Function (embedded) that calls to and takes responses from an >> external business logic function (python worker similar to the one in >> the python greeter example) >> >> * Kinesis Egress (embedded) >> >> >> For the time being I am working with a local docker-compose cluster, but >> the goal would be to move this to kubernetes for production. The stream >> processing itself is working fine, but I can't solve two problems with >> respect to Fault Tolerance: >> >> 1) The app is not writing checkpoints or savepoints at all (rocksDB, >> local filesystem). A checkpoint dir is created on startup but stays >> empty the whole time. When stopping the job, a savepoint dir is created >> but the stop ultimately fails with a >> java.util.concurrent.TimeoutException and the job continues to run. >> >> 2) When I try and simulate failure in the external Function >> ("docker-compose stop python-worker && sleep 10 && docker-compose start >> python-worker"), I lose all messages in between restarts. Although, the >> documentation states that "For both state and messaging, Stateful >> Functions is able to provide the exactly-once guarantees users expect >> from a modern data processing framework". >> >> See the relevant parts of my configs below. >> >> Any input or help would be greatly appreciated. >> >> >> Best regards >> >> Jan >> >> >> ------ >> >> flink-conf.yaml >> >> ------- >> >> jobmanager.rpc.address: jobmanager >> jobmanager.rpc.port: 6123 >> jobmanager.memory.process.size: 1600m >> taskmanager.memory.process.size: 1728m >> taskmanager.numberOfTaskSlots: 1 >> parallelism.default: 1 >> state.backend: rocksdb >> state.backend.rocksdb.timer-service.factory: ROCKSDB >> state.checkpoints.dir: file:///checkpoint-dir >> state.savepoints.dir: file:///checkpoint-dir >> jobmanager.execution.failover-strategy: region >> blob.server.port: 6124 >> query.server.port: 6125 >> classloader.parent-first-patterns.additional: >> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf >> >> >> -------- >> >> docker-compose.yaml >> >> ------- >> >> jobmanager: >> image: flink:1.11.2-scala_2.12-java8 >> expose: >> - "6123" >> ports: >> - "8082:8081" >> volumes: >> - ./streamProcessor/checkpoint-dir:/checkpoint-dir >> - >> ./streamProcessor/conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml:ro >> command: jobmanager >> environment: >> - JOB_MANAGER_RPC_ADDRESS=jobmanager >> - "FLINK_PROPERTIES=classloader.parent-first-patterns.additional: >> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf" >> taskmanager: >> image: flink:1.11.2-scala_2.12-java8 >> expose: >> - "6121" >> - "6122" >> depends_on: >> - jobmanager >> command: taskmanager >> links: >> - "jobmanager:jobmanager" >> environment: >> - JOB_MANAGER_RPC_ADDRESS=jobmanager >> - "FLINK_PROPERTIES=classloader.parent-first-patterns.additional: >> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf" >> >> -- > neuland – Büro für Informatik GmbH > Konsul-Smidt-Str. 8g, 28217 Bremen > > Telefon (0421) 380107 57 > Fax (0421) 380107 99https://www.neuland-bfi.de > https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi > > > Geschäftsführer: Thomas Gebauer, Jan Zander > Registergericht: Amtsgericht Bremen, HRB 23395 HB > USt-ID. DE 246585501 > >