Absolutely right. New findings. Checkpointing was NOT configured (!!!). So I added it. And, as you predicted, the exceptions no longer occur and the job cancels just fine - with one caveat.
As I tail the TaskManager logs, it continues to stream: ``` 2021-09-30 13:41:41,076 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_891889822_rktbi-dev-accounts-3, groupId=Reader-0_offset_consumer_891889822_rktbi-dev-accounts] Seeking to LATEST offset of partition accountflow.ba.create.dev-0 2021-09-30 13:41:41,121 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=consumer-Reader-0_offset_consumer_891889822_rktbi-dev-accounts-3, groupId=Reader-0_offset_consumer_891889822_rktbi-dev-accounts] Resetting offset for partition accountflow.ba.create.dev-0 to offset 1. ``` I've waited about 2 minutes after the Flink web UI reads that the job is fully cancelled and these still stream by the logs. Any ideas? In any case, thank you very much for your help, Jan. Marco On Thu, Sep 30, 2021 at 9:11 AM Jan Lukavský <[email protected]> wrote: > Do you set --checkpointingInterval? I have seen similar behavior, but only > when checkpointing is disabled (missing), see [1]. > > [1] https://issues.apache.org/jira/browse/BEAM-12053 > On 9/30/21 2:14 PM, Marco Costantini wrote: > > Thanks Jan, > More than continuing safely, I need to be able to stop the jobs safely. > Currently, doing so "blows up" the Task Manager. Blows up meaning that the > exceptions stream so fast that the TaskManager shuts down for an unobserved > reason : OOM? HDD space? > > If I connect to kafka with KafkaIO, then click Cancel Job -> boom (the > exceptions start streaming in logs) > > I've tried 'stopping' the job via the REST API but it gives a response > like "the module is already in Finished state. Not stopping". It is > correct in that one of my two pipeline stages is finished but one is in > RUNNING. > > Any tips to clean this mess up? > > On Thu, Sep 30, 2021 at 3:30 AM Jan Lukavský <[email protected]> wrote: > >> Hi Marco, >> >> what is your intention? You want to upgrade the pipeline? Flink uses >> checkpoints / savepoints (see [1]), so cancelling pipeline to savepoint >> and then resuming from the savepoint should be safe. Another option is >> to enable offset commit to Kafka via [2]. That way you should be able to >> resume even without savepoint (but you will loose any internal pipeline >> state, so that is mostly useful for stateless pipelines). >> >> Would that work for you? >> >> Jan >> >> [1] >> >> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ >> >> [2] >> >> https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize-- >> >> On 9/30/21 12:28 AM, Marco Costantini wrote: >> > Using a FlinkRunner, if I cancel the job, the pipeline blows up. >> > Exceptions stream across my screen rapidly. >> > >> > ``` >> > java.lang.InterruptedException: null >> > at >> > java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:944) >> > ~[?:1.8.0_282] >> > at >> > >> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:584) >> >> > >> ~[blob_p-b0501fca08dc4506cf9cffe14466df74e4d010e9-d1cc5178ec372501d7c1e755b90de159:?] >> > ``` >> > How can I gracefully stop my Flink+Beam job that uses an unbounded >> > KafkaIO source? >> > >> > If it is not explicitly supported, are there any work-arounds? >> > >> > Please and thank you, >> > Marco. >> >
