Thanks to Jan, I needed checkpointing. But also, my checkpointing was too short (100ms). It caused my Kafka reader to remain alive for long after I cancel the job. I set it to 1 second and it works as expected.
Thank you, Marco. On Thu, Sep 30, 2021 at 9:44 AM Marco Costantini < [email protected]> wrote: > Absolutely right. New findings. > > Checkpointing was NOT configured (!!!). So I added it. And, as you > predicted, the exceptions no longer occur and the job cancels just fine - > with one caveat. > > As I tail the TaskManager logs, it continues to stream: > > ``` > 2021-09-30 13:41:41,076 INFO > org.apache.kafka.clients.consumer.internals.SubscriptionState [] - > [Consumer > clientId=consumer-Reader-0_offset_consumer_891889822_rktbi-dev-accounts-3, > groupId=Reader-0_offset_consumer_891889822_rktbi-dev-accounts] Seeking to > LATEST offset of partition accountflow.ba.create.dev-0 > 2021-09-30 13:41:41,121 INFO > org.apache.kafka.clients.consumer.internals.SubscriptionState [] - > [Consumer > clientId=consumer-Reader-0_offset_consumer_891889822_rktbi-dev-accounts-3, > groupId=Reader-0_offset_consumer_891889822_rktbi-dev-accounts] Resetting > offset for partition accountflow.ba.create.dev-0 to offset 1. > ``` > > I've waited about 2 minutes after the Flink web UI reads that the job is > fully cancelled and these still stream by the logs. Any ideas? > > In any case, thank you very much for your help, Jan. > > Marco > > > On Thu, Sep 30, 2021 at 9:11 AM Jan Lukavský <[email protected]> wrote: > >> Do you set --checkpointingInterval? I have seen similar behavior, but >> only when checkpointing is disabled (missing), see [1]. >> >> [1] https://issues.apache.org/jira/browse/BEAM-12053 >> On 9/30/21 2:14 PM, Marco Costantini wrote: >> >> Thanks Jan, >> More than continuing safely, I need to be able to stop the jobs safely. >> Currently, doing so "blows up" the Task Manager. Blows up meaning that the >> exceptions stream so fast that the TaskManager shuts down for an unobserved >> reason : OOM? HDD space? >> >> If I connect to kafka with KafkaIO, then click Cancel Job -> boom (the >> exceptions start streaming in logs) >> >> I've tried 'stopping' the job via the REST API but it gives a response >> like "the module is already in Finished state. Not stopping". It is >> correct in that one of my two pipeline stages is finished but one is in >> RUNNING. >> >> Any tips to clean this mess up? >> >> On Thu, Sep 30, 2021 at 3:30 AM Jan Lukavský <[email protected]> wrote: >> >>> Hi Marco, >>> >>> what is your intention? You want to upgrade the pipeline? Flink uses >>> checkpoints / savepoints (see [1]), so cancelling pipeline to savepoint >>> and then resuming from the savepoint should be safe. Another option is >>> to enable offset commit to Kafka via [2]. That way you should be able to >>> resume even without savepoint (but you will loose any internal pipeline >>> state, so that is mostly useful for stateless pipelines). >>> >>> Would that work for you? >>> >>> Jan >>> >>> [1] >>> >>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/ >>> >>> [2] >>> >>> https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize-- >>> >>> On 9/30/21 12:28 AM, Marco Costantini wrote: >>> > Using a FlinkRunner, if I cancel the job, the pipeline blows up. >>> > Exceptions stream across my screen rapidly. >>> > >>> > ``` >>> > java.lang.InterruptedException: null >>> > at >>> > java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:944) >>> > ~[?:1.8.0_282] >>> > at >>> > >>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:584) >>> >>> > >>> ~[blob_p-b0501fca08dc4506cf9cffe14466df74e4d010e9-d1cc5178ec372501d7c1e755b90de159:?] >>> > ``` >>> > How can I gracefully stop my Flink+Beam job that uses an unbounded >>> > KafkaIO source? >>> > >>> > If it is not explicitly supported, are there any work-arounds? >>> > >>> > Please and thank you, >>> > Marco. >>> >>
