Do you set --checkpointingInterval? I have seen similar behavior, but
only when checkpointing is disabled (missing), see [1].
[1] https://issues.apache.org/jira/browse/BEAM-12053
On 9/30/21 2:14 PM, Marco Costantini wrote:
Thanks Jan,
More than continuing safely, I need to be able to stop the jobs
safely. Currently, doing so "blows up" the Task Manager. Blows up
meaning that the exceptions stream so fast that the TaskManager shuts
down for an unobserved reason : OOM? HDD space?
If I connect to kafka with KafkaIO, then click Cancel Job -> boom (the
exceptions start streaming in logs)
I've tried 'stopping' the job via the REST API but it gives a response
like "the module is already in Finished state. Not stopping". It is
correct in that one of my two pipeline stages is finished but one is
in RUNNING.
Any tips to clean this mess up?
On Thu, Sep 30, 2021 at 3:30 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Marco,
what is your intention? You want to upgrade the pipeline? Flink uses
checkpoints / savepoints (see [1]), so cancelling pipeline to
savepoint
and then resuming from the savepoint should be safe. Another
option is
to enable offset commit to Kafka via [2]. That way you should be
able to
resume even without savepoint (but you will loose any internal
pipeline
state, so that is mostly useful for stateless pipelines).
Would that work for you?
Jan
[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
<https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/>
[2]
https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
<https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize-->
On 9/30/21 12:28 AM, Marco Costantini wrote:
> Using a FlinkRunner, if I cancel the job, the pipeline blows up.
> Exceptions stream across my screen rapidly.
>
> ```
> java.lang.InterruptedException: null
> at
>
java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:944)
> ~[?:1.8.0_282]
> at
>
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:584)
>
~[blob_p-b0501fca08dc4506cf9cffe14466df74e4d010e9-d1cc5178ec372501d7c1e755b90de159:?]
> ```
> How can I gracefully stop my Flink+Beam job that uses an unbounded
> KafkaIO source?
>
> If it is not explicitly supported, are there any work-arounds?
>
> Please and thank you,
> Marco.