Thanks to Jan,

I needed checkpointing. But also, my checkpointing was too short (100ms).
It caused my Kafka reader to remain alive for long after I cancel the job.
I set it to 1 second and it works as expected.

Thank you,
Marco.

On Thu, Sep 30, 2021 at 9:44 AM Marco Costantini <
[email protected]> wrote:

> Absolutely right. New findings.
>
> Checkpointing was NOT configured (!!!). So I added it. And, as you
> predicted, the exceptions no longer occur and the job cancels just fine -
> with one caveat.
>
> As I tail the TaskManager logs, it continues to stream:
>
> ```
> 2021-09-30 13:41:41,076 INFO
>  org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
> [Consumer
> clientId=consumer-Reader-0_offset_consumer_891889822_rktbi-dev-accounts-3,
> groupId=Reader-0_offset_consumer_891889822_rktbi-dev-accounts] Seeking to
> LATEST offset of partition accountflow.ba.create.dev-0
> 2021-09-30 13:41:41,121 INFO
>  org.apache.kafka.clients.consumer.internals.SubscriptionState [] -
> [Consumer
> clientId=consumer-Reader-0_offset_consumer_891889822_rktbi-dev-accounts-3,
> groupId=Reader-0_offset_consumer_891889822_rktbi-dev-accounts] Resetting
> offset for partition accountflow.ba.create.dev-0 to offset 1.
> ```
>
> I've waited about 2 minutes after the Flink web UI reads that the job is
> fully cancelled and these still stream by the logs. Any ideas?
>
> In any case, thank you very much for your help, Jan.
>
> Marco
>
>
> On Thu, Sep 30, 2021 at 9:11 AM Jan Lukavský <[email protected]> wrote:
>
>> Do you set --checkpointingInterval? I have seen similar behavior, but
>> only when checkpointing is disabled (missing), see [1].
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-12053
>> On 9/30/21 2:14 PM, Marco Costantini wrote:
>>
>> Thanks Jan,
>> More than continuing safely, I need to be able to stop the jobs safely.
>> Currently, doing so "blows up" the Task Manager. Blows up meaning that the
>> exceptions stream so fast that the TaskManager shuts down for an unobserved
>> reason : OOM? HDD space?
>>
>> If I connect to kafka with KafkaIO, then click Cancel Job -> boom (the
>> exceptions start streaming in logs)
>>
>> I've tried 'stopping' the job via the REST API but it gives a response
>> like "the module is already in Finished state. Not stopping". It is
>> correct in that one of my two pipeline stages is finished but one is in
>> RUNNING.
>>
>> Any tips to clean this mess up?
>>
>> On Thu, Sep 30, 2021 at 3:30 AM Jan Lukavský <[email protected]> wrote:
>>
>>> Hi Marco,
>>>
>>> what is your intention? You want to upgrade the pipeline? Flink uses
>>> checkpoints / savepoints (see [1]), so cancelling pipeline to savepoint
>>> and then resuming from the savepoint should be safe. Another option is
>>> to enable offset commit to Kafka via [2]. That way you should be able to
>>> resume even without savepoint (but you will loose any internal pipeline
>>> state, so that is mostly useful for stateless pipelines).
>>>
>>> Would that work for you?
>>>
>>>   Jan
>>>
>>> [1]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/
>>>
>>> [2]
>>>
>>> https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>>>
>>> On 9/30/21 12:28 AM, Marco Costantini wrote:
>>> > Using a FlinkRunner, if I cancel the job, the pipeline blows up.
>>> > Exceptions stream across my screen rapidly.
>>> >
>>> > ```
>>> > java.lang.InterruptedException: null
>>> >         at
>>> > java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:944)
>>> > ~[?:1.8.0_282]
>>> >         at
>>> >
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:584)
>>>
>>> >
>>> ~[blob_p-b0501fca08dc4506cf9cffe14466df74e4d010e9-d1cc5178ec372501d7c1e755b90de159:?]
>>> > ```
>>> > How can I gracefully stop my Flink+Beam job that uses an unbounded
>>> > KafkaIO source?
>>> >
>>> > If it is not explicitly supported, are there any work-arounds?
>>> >
>>> > Please and thank you,
>>> > Marco.
>>>
>>

Reply via email to