Hi Jan, The architecture outlined by you, sounds good and we've run successfully mixed architectures like this. Let me try to address your questions:
1) To enable checkpointing you need to set the relevant values in your flink-conf.yaml file. execution.checkpointing.interval: <duration> (see [1]) state.checkpoint.dir: <path> (see [2]) You can take a look here for an example [3]. The easiest way to incorporate the changes would be to add your custom flink-conf.yaml into your docker image (here is an example [4]). When you will be using kubernetes, you can mount a config map as a flink-conf.yaml, check out the helm charts here: [5] 2) When the remote function is unavailable, StateFun would buffer the messages addressed to it, upto the specified timeout (default would be 1 minute, you can set it here [6]) before the job is considered to be failed and it would be restarted. It seems like in your example you are waiting for 10 seconds, so the messages should be delivered. Do you set function.spec.timeout or .withMaxRequestDuration() to something else? Good luck! Igal. p.s, Consider using StateFun docker images[7], see any of the examples in the statefun repository. [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-checkpointing-interval [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-checkpoints-dir [3] https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml [4] https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile#L20 [5] https://github.com/apache/flink-statefun/tree/master/tools/k8s [6] look for function.spec.timeout at https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/index.html [7] https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#images On Thu, Nov 5, 2020 at 3:35 PM Jan Brusch <jan.bru...@neuland-bfi.de> wrote: > Hi, > > I'm currently trying to set up a Flink Stateful Functions Job with the > following architecture: > > * Kinesis Ingress (embedded) > > * Stateful Function (embedded) that calls to and takes responses from an > external business logic function (python worker similar to the one in > the python greeter example) > > * Kinesis Egress (embedded) > > > For the time being I am working with a local docker-compose cluster, but > the goal would be to move this to kubernetes for production. The stream > processing itself is working fine, but I can't solve two problems with > respect to Fault Tolerance: > > 1) The app is not writing checkpoints or savepoints at all (rocksDB, > local filesystem). A checkpoint dir is created on startup but stays > empty the whole time. When stopping the job, a savepoint dir is created > but the stop ultimately fails with a > java.util.concurrent.TimeoutException and the job continues to run. > > 2) When I try and simulate failure in the external Function > ("docker-compose stop python-worker && sleep 10 && docker-compose start > python-worker"), I lose all messages in between restarts. Although, the > documentation states that "For both state and messaging, Stateful > Functions is able to provide the exactly-once guarantees users expect > from a modern data processing framework". > > See the relevant parts of my configs below. > > Any input or help would be greatly appreciated. > > > Best regards > > Jan > > > ------ > > flink-conf.yaml > > ------- > > jobmanager.rpc.address: jobmanager > jobmanager.rpc.port: 6123 > jobmanager.memory.process.size: 1600m > taskmanager.memory.process.size: 1728m > taskmanager.numberOfTaskSlots: 1 > parallelism.default: 1 > state.backend: rocksdb > state.backend.rocksdb.timer-service.factory: ROCKSDB > state.checkpoints.dir: file:///checkpoint-dir > state.savepoints.dir: file:///checkpoint-dir > jobmanager.execution.failover-strategy: region > blob.server.port: 6124 > query.server.port: 6125 > classloader.parent-first-patterns.additional: > org.apache.flink.statefun;org.apache.kafka;com.google.protobuf > > > -------- > > docker-compose.yaml > > ------- > > jobmanager: > image: flink:1.11.2-scala_2.12-java8 > expose: > - "6123" > ports: > - "8082:8081" > volumes: > - ./streamProcessor/checkpoint-dir:/checkpoint-dir > - > ./streamProcessor/conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml:ro > command: jobmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=jobmanager > - "FLINK_PROPERTIES=classloader.parent-first-patterns.additional: > org.apache.flink.statefun;org.apache.kafka;com.google.protobuf" > taskmanager: > image: flink:1.11.2-scala_2.12-java8 > expose: > - "6121" > - "6122" > depends_on: > - jobmanager > command: taskmanager > links: > - "jobmanager:jobmanager" > environment: > - JOB_MANAGER_RPC_ADDRESS=jobmanager > - "FLINK_PROPERTIES=classloader.parent-first-patterns.additional: > org.apache.flink.statefun;org.apache.kafka;com.google.protobuf" > >