How do you deploy the job currently?
Are you using the data stream integration / or as a Flink Jar [1]

(also please note, that the directories might be created but without
checkpoint interval set, they will be empty)

Regarding your two questions:

That is true that you can theoretically share the same cluster to submit
additional jobs besides StateFun.
statefun requires a specific set of configurations, that might not apply
for your other jobs.
Considering your end-goal of eventually using kubernetes, the recommended
way is actually using a cluster per job, and StateFun docker images
are a convenient way to package your modules.

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#flink-jar


On Thu, Nov 5, 2020 at 5:29 PM Jan Brusch <jan.bru...@neuland-bfi.de> wrote:

> Hi Igal,
>
> thanks for your quick and detailed reply! For me, this is the really great
> defining feature of Stateful Functions: Separating StreamProcessing
> "Infrastructure" from Business Logic Code, possibly maintained by a
> different team.
>
> Regarding your points: I did add the checkpoint interval to the flink-conf
> to to avail. state.checkpoint.dir was already set and all the necessary
> subfolders get created on job startup. They just stay empty...
>
> Thanks for the pointer to the helm charts! Just what I was looking for!
>
> A question regarding StateFun docker images: I would actually prefer using
> them but my fear is that they would take away the my options to:
>
> 1) deploy a new release of my StateFun job without killing the cluster,
> because...
>
> 2) ... I would like to schedule regular flink jobs or additional StateFun
> jobs on the same cluster alongside my original job.
>
> Could you give a quick opinion if these fears are even true and if so,
> what would be a recommended setup to satisfy these use cases?
>
>
> Best regards
>
> Jan
>
>
> On 05.11.20 17:02, Igal Shilman wrote:
>
> Hi Jan,
>
> The architecture outlined by you, sounds good and we've run successfully
> mixed architectures like this.
> Let me try to address your questions:
>
> 1)
> To enable checkpointing you need to set the relevant values in your
> flink-conf.yaml file.
> execution.checkpointing.interval: <duration> (see [1])
> state.checkpoint.dir: <path> (see [2])
>
> You can take a look here for an example [3]. The easiest way to
> incorporate the changes would be to add your custom flink-conf.yaml into
> your docker image (here is an example [4]).
> When you will be using kubernetes, you can mount a config map as a
> flink-conf.yaml, check out the helm charts here: [5]
>
> 2)
> When the remote function is unavailable, StateFun would buffer the
> messages addressed to it, upto the specified
> timeout (default would be 1 minute, you can set it here [6]) before the
> job is considered to be failed and it would be restarted.
> It seems like in your example you are waiting for 10 seconds, so the
> messages should be delivered.
> Do you set function.spec.timeout or .withMaxRequestDuration() to something
> else?
>
>
> Good luck!
> Igal.
>
> p.s,
> Consider using StateFun docker images[7], see any of the examples in the
> statefun repository.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#execution-checkpointing-interval
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#state-checkpoints-dir
> [3]
> https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-e2e-tests-common/src/main/resources/flink-conf.yaml
> [4]
> https://github.com/apache/flink-statefun/blob/master/statefun-e2e-tests/statefun-sanity-e2e/src/test/resources/Dockerfile#L20
> [5] https://github.com/apache/flink-statefun/tree/master/tools/k8s
> [6] look for function.spec.timeout at
> https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/index.html
> [7]
> https://ci.apache.org/projects/flink/flink-statefun-docs-master/deployment-and-operations/packaging.html#images
>
> On Thu, Nov 5, 2020 at 3:35 PM Jan Brusch <jan.bru...@neuland-bfi.de>
> wrote:
>
>> Hi,
>>
>> I'm currently trying to set up a Flink Stateful Functions Job with the
>> following architecture:
>>
>> * Kinesis Ingress (embedded)
>>
>> * Stateful Function (embedded) that calls to and takes responses from an
>> external business logic function (python worker similar to the one in
>> the python greeter example)
>>
>> * Kinesis Egress (embedded)
>>
>>
>> For the time being I am working with a local docker-compose cluster, but
>> the goal would be to move this to kubernetes for production. The stream
>> processing itself is working fine, but I can't solve two problems with
>> respect to Fault Tolerance:
>>
>> 1) The app is not writing checkpoints or savepoints at all (rocksDB,
>> local filesystem). A checkpoint dir is created on startup but stays
>> empty the whole time. When stopping the job, a savepoint dir is created
>> but the stop ultimately fails with a
>> java.util.concurrent.TimeoutException and the job continues to run.
>>
>> 2) When I try and simulate failure in the external Function
>> ("docker-compose stop python-worker && sleep 10 && docker-compose start
>> python-worker"), I lose all messages in between restarts. Although, the
>> documentation states that "For both state and messaging, Stateful
>> Functions is able to provide the exactly-once guarantees users expect
>> from a modern data processing framework".
>>
>> See the relevant parts of my configs below.
>>
>> Any input or help would be greatly appreciated.
>>
>>
>> Best regards
>>
>> Jan
>>
>>
>> ------
>>
>> flink-conf.yaml
>>
>> -------
>>
>> jobmanager.rpc.address: jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.memory.process.size: 1600m
>> taskmanager.memory.process.size: 1728m
>> taskmanager.numberOfTaskSlots: 1
>> parallelism.default: 1
>> state.backend: rocksdb
>> state.backend.rocksdb.timer-service.factory: ROCKSDB
>> state.checkpoints.dir: file:///checkpoint-dir
>> state.savepoints.dir: file:///checkpoint-dir
>> jobmanager.execution.failover-strategy: region
>> blob.server.port: 6124
>> query.server.port: 6125
>> classloader.parent-first-patterns.additional:
>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>>
>>
>> --------
>>
>> docker-compose.yaml
>>
>> -------
>>
>>    jobmanager:
>>      image: flink:1.11.2-scala_2.12-java8
>>      expose:
>>        - "6123"
>>      ports:
>>        - "8082:8081"
>>      volumes:
>>        - ./streamProcessor/checkpoint-dir:/checkpoint-dir
>>        -
>> ./streamProcessor/conf/flink-conf.yaml:/opt/flink/conf/flink-conf.yaml:ro
>>      command: jobmanager
>>      environment:
>>        - JOB_MANAGER_RPC_ADDRESS=jobmanager
>>        - "FLINK_PROPERTIES=classloader.parent-first-patterns.additional:
>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"
>>    taskmanager:
>>      image: flink:1.11.2-scala_2.12-java8
>>      expose:
>>        - "6121"
>>        - "6122"
>>      depends_on:
>>        - jobmanager
>>      command: taskmanager
>>      links:
>>        - "jobmanager:jobmanager"
>>      environment:
>>        - JOB_MANAGER_RPC_ADDRESS=jobmanager
>>        - "FLINK_PROPERTIES=classloader.parent-first-patterns.additional:
>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"
>>
>> --
> neuland  – Büro für Informatik GmbH
> Konsul-Smidt-Str. 8g, 28217 Bremen
>
> Telefon (0421) 380107 57
> Fax (0421) 380107 99https://www.neuland-bfi.de
> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi
>
>
> Geschäftsführer: Thomas Gebauer, Jan Zander
> Registergericht: Amtsgericht Bremen, HRB 23395 HB
> USt-ID. DE 246585501
>
>

Reply via email to