Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-10 Thread Fabian Paul
Hi Dongwon,

Thanks for sharing the logs and the metrics screenshots with us. Unfortunately,
I think we need more information to further isolate the problem therefore I have
a couple of suggestions.

1. Since you already set up PromQL can you also share the JVM memory statics
i.e. DirectMemory consumption over time? I would be interested to see whether
the consumption is slowly increasing until the OOM happens or if it spikes
only during the failing checkpoint.

2. We suspect that the Kafka Sink is causing the problem. Can you try to run
your pipeline with a simple DiscadingSink and see if the error keeps happening?
Maybe another component in your pipeline allocates a lot of DirectMemory and
only the FlinkKafkaProducer hides the problems because it is the first component
hitting the threshold.
Another option would be to test the new KafkaSink which was released with 1.14
and should replace the FlinkKafkaProducer.

Best,
Fabian

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Dongwon Kim
Hi Fabian,

Can you maybe share more about the setup and how you use the AsyncFunction
> with
> the Kafka client?

Oops, I didn't mention in the reply to David that the kafka producer has
nothing to do with the AsyncFunction!
I interact with Redis and a Spring boot app. in the AsyncFunction, not
Kafka.
My pipeline looks like "KafkaSource -> AsyncFunction -> Window ->
(KafkaSink1, KafkaSink2)" and the kafka producer is the last one.

As David already pointed out it could be indeed a Kafka bug but it could
> also
> mean that your defined async function leaks direct memory by not freeing
> some
> resources.

The problem seems to be the KafkaSink2 which I recently added to the
pipeline in order to write large records (~10MB) for a debugging purpose.
I just launched the pipeline without KafkaSink2 to see whether or not my
conjecture is right or wrong.
If so, I'd rather give more direct memory to each task manager to avoid
this problem.

We can definitely improve the metrics for the AsyncFunction and expose the
> current queue size as a followup.

Really looking forward to it. I spent many hours debugging AsyncFunction
without metrics. It would be great to have how many records are timed out
as a metric as well.

Thanks,

Dongwon


On Tue, Nov 9, 2021 at 7:55 PM Fabian Paul  wrote:

> Hi Dongwan,
>
> Can you maybe share more about the setup and how you use the AsyncFunction
> with
> the Kafka client?
>
> As David already pointed out it could be indeed a Kafka bug but it could
> also
> mean that your defined async function leaks direct memory by not freeing
> some
> resources.
>
> We can definitely improve the metrics for the AsyncFunction and expose the
> current queue size as a followup.
>
> Best,
> Fabian


Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Dongwon Kim
Hi Jun,

Did you override AsyncFunction#timeout()?  If so, did you call
> resultFuture.complete()/completeExceptionally() in your override?  Not
> calling them can result in checkpoint timeout.

No, I've only called resultFuture.complete() in AsyncFunction.asyncInvoke()
and didn't know much about  AsyncFunction.timeout(). However, looking at
the default implementation of AsyncFunction.timeout(), I'd rather get the
timeout exception as my streaming pipeline is going to be fail-fast which
is what I prefer the most.

I think I understand what you are concerned about but, as I wrote in the
reply to David, the problem seems to be the kafka sink which I recently
added to the pipeline in order to write large records (~10MB) for the
debugging purpose.

Anyway thanks a lot for letting me know the possibility of overriding
AsyncFunction.timeout().

Best,

Dongwon

On Tue, Nov 9, 2021 at 5:53 PM Jun Qin  wrote:

> Hi Dongwon
>
> Did you override AsyncFunction#timeout()?  If so, did you call
> resultFuture.complete()/completeExceptionally() in your override?  Not
> calling them can result in checkpoint timeout.
>
> Thanks
> Jun
>
>
> On Nov 9, 2021, at 7:37 AM, Dongwon Kim  wrote:
>
> Hi David,
>
> There are currently no metrics for the async work-queue size (you should
>> be able to see the queue stats with debug logs enabled though [1]).
>
> Thanks for the input but scraping DEBUG messages into, for example,
> ElasticSearch for monitoring on Grafana is not possible in my current
> environment.
> I just defined two counters in RichAsyncFunction for tracking # sent
> requests and # finished/failed requests, respectively, and used the two
> counters to calculate the inflight requests from Prometheus.
>
> As far as I can tell from looking at the code, the async operator is able
>> to checkpoint even if the work-queue is exhausted.
>
> Oh, I didn't know that! As you pointed out and I'm going to explain below,
> the async operator might not be the source of the problem.
>
> I just hit the same situation and found that
> - # of inflight records are zero when the backpressure is getting high
> - A taskmanager complains the following error message around the time when
> the backpressure is getting high (all the others don't do):
>
>> 2021-11-09 13:20:40,601 ERROR org.apache.kafka.common.utils.KafkaThread
>>   [] - Uncaught exception in thread
>> 'kafka-producer-network-thread | producer-8':
>> java.lang.OutOfMemoryError: Direct buffer memory
>> at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
>> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
>> ~[?:?]
>> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
>> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) ~[?:?]
>> at sun.nio.ch.IOUtil.write(IOUtil.java:164) ~[?:?]
>> at sun.nio.ch.IOUtil.write(IOUtil.java:130) ~[?:?]
>> at
>> sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:493) ~[?:?]
>> at java.nio.channels.SocketChannel.write(SocketChannel.java:507)
>> ~[?:?]
>> at
>> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:152)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.common.network.KafkaChannel.send(KafkaChannel.java:429)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:399)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:589)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>> at java.lang.Thread.run(Thread.java:829) [?:?]
>>
>
> Can it be the reason why my pipeline is stalled and ends up with the
> checkout timeout? I guess all the upstream tasks might fail to send data to
> the failed kafka producer and records are stacking up in buffers, which
> could result in the back-pressure. If so, is there no mechanism in Flink to
> detect such an error and send it to the job manager for debugging purposes?
>
> Best,
>
> Dongwon
>
>
> On Mon, Nov 8, 2021 at 9:21 PM David Morávek  wrote:
>
>> Hi Dongwon,
>>
>> There are currently no metrics for the 

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Piotr Nowojski
Hi All,

to me it looks like something deadlocked, maybe due to this OOM error from
Kafka, preventing a Task from making any progress. To confirm Dongwan you
could collecte stack traces while the job is in such a blocked state.
Deadlocked Kafka could easily explain those symptoms and it would be
visible as an extreme back pressure. Another thing to look at would be if
the job is making any progress or not at all (via for example
numRecordsIn/numRecordsOut metric [1]).

A couple of clarifications.

> What I suspect is the capacity of the asynchronous operation because
limiting the value can cause back-pressure once the capacity is exhausted
[1].
> Although I could increase the value (...)

If you want to decrease the impact of a backpressure, you should decrease
the capacity. Not increase it. The more in-flight records in the system,
the more records need to be processed/persisted in aligned/unaligned
checkpoints.

> As far as I can tell from looking at the code, the async operator is able
to checkpoint even if the work-queue is exhausted.

Yes and no. If work-queue is full, `AsyncWaitOperator` can be snapshoted,
but it can not be blocked inside the `AsyncWaitOperator#processElement`
method. For checkpoint to be executed, `AsyncWaitrOperator` must finish
processing the current record and return execution to the task thread. If
the work-queue is full, `AsyncWaitOperator` will block inside the
`AsyncWaitOperator#addToWorkQueue` method until the work-queue will have
capacity to accept this new element. If what I suspect is happening here is
true, and the job is deadlocked via this Kafka issue, `AsyncWaitOperator`
will be blocked indefinitely in this method.

Best,
Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.14/docs/ops/metrics/#io



wt., 9 lis 2021 o 11:55 Fabian Paul  napisał(a):

> Hi Dongwan,
>
> Can you maybe share more about the setup and how you use the AsyncFunction
> with
> the Kafka client?
>
> As David already pointed out it could be indeed a Kafka bug but it could
> also
> mean that your defined async function leaks direct memory by not freeing
> some
> resources.
>
> We can definitely improve the metrics for the AsyncFunction and expose the
> current queue size as a followup.
>
> Best,
> Fabian


Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Fabian Paul
Hi Dongwan,

Can you maybe share more about the setup and how you use the AsyncFunction with
the Kafka client?

As David already pointed out it could be indeed a Kafka bug but it could also
mean that your defined async function leaks direct memory by not freeing some
resources.

We can definitely improve the metrics for the AsyncFunction and expose the
current queue size as a followup.

Best,
Fabian

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Jun Qin
Hi Dongwon

Did you override AsyncFunction#timeout()?  If so, did you call 
resultFuture.complete()/completeExceptionally() in your override?  Not calling 
them can result in checkpoint timeout.

Thanks
Jun


> On Nov 9, 2021, at 7:37 AM, Dongwon Kim  wrote:
> 
> Hi David,
> 
> There are currently no metrics for the async work-queue size (you should be 
> able to see the queue stats with debug logs enabled though [1]). 
> Thanks for the input but scraping DEBUG messages into, for example, 
> ElasticSearch for monitoring on Grafana is not possible in my current 
> environment.
> I just defined two counters in RichAsyncFunction for tracking # sent requests 
> and # finished/failed requests, respectively, and used the two counters to 
> calculate the inflight requests from Prometheus.
> 
> As far as I can tell from looking at the code, the async operator is able to 
> checkpoint even if the work-queue is exhausted.
> Oh, I didn't know that! As you pointed out and I'm going to explain below, 
> the async operator might not be the source of the problem.
> 
> I just hit the same situation and found that 
> - # of inflight records are zero when the backpressure is getting high
> - A taskmanager complains the following error message around the time when 
> the backpressure is getting high (all the others don't do):
> 2021-11-09 13:20:40,601 ERROR org.apache.kafka.common.utils.KafkaThread   
>  [] - Uncaught exception in thread 'kafka-producer-network-thread 
> | producer-8':
> java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:118) ~[?:?]
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) ~[?:?]
> at sun.nio.ch.IOUtil.write(IOUtil.java:164) ~[?:?]
> at sun.nio.ch.IOUtil.write(IOUtil.java:130) ~[?:?]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:493) 
> ~[?:?]
> at java.nio.channels.SocketChannel.write(SocketChannel.java:507) 
> ~[?:?]
> at 
> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:152)
>  ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60)
>  ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.send(KafkaChannel.java:429) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:399) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:589) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at org.apache.kafka.common.network.Selector.poll(Selector.java:483) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) 
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
> at java.lang.Thread.run(Thread.java:829) [?:?]
> 
> Can it be the reason why my pipeline is stalled and ends up with the checkout 
> timeout? I guess all the upstream tasks might fail to send data to the failed 
> kafka producer and records are stacking up in buffers, which could result in 
> the back-pressure. If so, is there no mechanism in Flink to detect such an 
> error and send it to the job manager for debugging purposes?
> 
> Best,
> 
> Dongwon
> 
> 
> On Mon, Nov 8, 2021 at 9:21 PM David Morávek  > wrote:
> Hi Dongwon,
> 
> There are currently no metrics for the async work-queue size (you should be 
> able to see the queue stats with debug logs enabled though [1]). As far as I 
> can tell from looking at the code, the async operator is able to checkpoint 
> even if the work-queue is exhausted.
> 
> Arvid can you please validate the above? (the checkpoints not being blocked 
> by the work queue part)
> 
> [1] 
> https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java#L109
>  
> 
> 
> Best,
> D.
> 
> On Sun, Nov 7, 2021 at 10:41 AM Dongwon Kim  > wrote:
> Hi community,
> 
> While using Flink's async i/o for interacting with an external system, I got 

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread David Morávek
This is definitely a bug on Kafka side, because they're not handling
uncaught exceptions properly [1]. I don't think there is much we can do on
the Flink side here, because we're not able to override factory for the
Kafka IO thread :/

[1] https://issues.apache.org/jira/browse/KAFKA-4228

On Tue, Nov 9, 2021 at 7:38 AM Dongwon Kim  wrote:

> Hi David,
>
> There are currently no metrics for the async work-queue size (you should
>> be able to see the queue stats with debug logs enabled though [1]).
>
> Thanks for the input but scraping DEBUG messages into, for example,
> ElasticSearch for monitoring on Grafana is not possible in my current
> environment.
> I just defined two counters in RichAsyncFunction for tracking # sent
> requests and # finished/failed requests, respectively, and used the two
> counters to calculate the inflight requests from Prometheus.
>
> As far as I can tell from looking at the code, the async operator is able
>> to checkpoint even if the work-queue is exhausted.
>
> Oh, I didn't know that! As you pointed out and I'm going to explain below,
> the async operator might not be the source of the problem.
>
> I just hit the same situation and found that
> - # of inflight records are zero when the backpressure is getting high
> - A taskmanager complains the following error message around the time when
> the backpressure is getting high (all the others don't do):
>
>> 2021-11-09 13:20:40,601 ERROR org.apache.kafka.common.utils.KafkaThread
>>   [] - Uncaught exception in thread
>> 'kafka-producer-network-thread | producer-8':
>>
>> java.lang.OutOfMemoryError: Direct buffer memory
>>
>> at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
>>
>> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
>> ~[?:?]
>>
>> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
>>
>> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) ~[?:?]
>>
>> at sun.nio.ch.IOUtil.write(IOUtil.java:164) ~[?:?]
>>
>> at sun.nio.ch.IOUtil.write(IOUtil.java:130) ~[?:?]
>>
>> at
>> sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:493) ~[?:?]
>>
>> at java.nio.channels.SocketChannel.write(SocketChannel.java:507)
>> ~[?:?]
>>
>> at
>> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:152)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.common.network.KafkaChannel.send(KafkaChannel.java:429)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:399)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:589)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>>
>> at java.lang.Thread.run(Thread.java:829) [?:?]
>>
>
> Can it be the reason why my pipeline is stalled and ends up with the
> checkout timeout? I guess all the upstream tasks might fail to send data to
> the failed kafka producer and records are stacking up in buffers, which
> could result in the back-pressure. If so, is there no mechanism in Flink to
> detect such an error and send it to the job manager for debugging purposes?
>
> Best,
>
> Dongwon
>
>
> On Mon, Nov 8, 2021 at 9:21 PM David Morávek  wrote:
>
>> Hi Dongwon,
>>
>> There are currently no metrics for the async work-queue size (you should
>> be able to see the queue stats with debug logs enabled though [1]). As far
>> as I can tell from looking at the code, the async operator is able to
>> checkpoint even if the work-queue is exhausted.
>>
>> Arvid can you please validate the above? (the checkpoints not being
>> blocked by the work queue part)
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java#L109
>>
>> Best,
>> D.
>>
>> On Sun, Nov 7, 2021 at 10:41 AM Dongwon Kim 
>> wrote:
>>
>>> Hi community,
>>>
>>> While using Flink's async i/o for interacting with an external system, I
>>> got the following exception:
>>>
>>> 2021-11-06 10:38:35,270 INFO  
>>> 

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-08 Thread Dongwon Kim
Hi David,

There are currently no metrics for the async work-queue size (you should be
> able to see the queue stats with debug logs enabled though [1]).

Thanks for the input but scraping DEBUG messages into, for example,
ElasticSearch for monitoring on Grafana is not possible in my current
environment.
I just defined two counters in RichAsyncFunction for tracking # sent
requests and # finished/failed requests, respectively, and used the two
counters to calculate the inflight requests from Prometheus.

As far as I can tell from looking at the code, the async operator is able
> to checkpoint even if the work-queue is exhausted.

Oh, I didn't know that! As you pointed out and I'm going to explain below,
the async operator might not be the source of the problem.

I just hit the same situation and found that
- # of inflight records are zero when the backpressure is getting high
- A taskmanager complains the following error message around the time when
the backpressure is getting high (all the others don't do):

> 2021-11-09 13:20:40,601 ERROR org.apache.kafka.common.utils.KafkaThread
>   [] - Uncaught exception in thread
> 'kafka-producer-network-thread | producer-8':
>
> java.lang.OutOfMemoryError: Direct buffer memory
>
> at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
>
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:118)
> ~[?:?]
>
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
>
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) ~[?:?]
>
> at sun.nio.ch.IOUtil.write(IOUtil.java:164) ~[?:?]
>
> at sun.nio.ch.IOUtil.write(IOUtil.java:130) ~[?:?]
>
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:493)
> ~[?:?]
>
> at java.nio.channels.SocketChannel.write(SocketChannel.java:507)
> ~[?:?]
>
> at
> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:152)
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>
> at
> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60)
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>
> at
> org.apache.kafka.common.network.KafkaChannel.send(KafkaChannel.java:429)
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>
> at
> org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:399)
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:589)
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>
> at
> org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>
> at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>
> at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?]
>
> at java.lang.Thread.run(Thread.java:829) [?:?]
>

Can it be the reason why my pipeline is stalled and ends up with the
checkout timeout? I guess all the upstream tasks might fail to send data to
the failed kafka producer and records are stacking up in buffers, which
could result in the back-pressure. If so, is there no mechanism in Flink to
detect such an error and send it to the job manager for debugging purposes?

Best,

Dongwon


On Mon, Nov 8, 2021 at 9:21 PM David Morávek  wrote:

> Hi Dongwon,
>
> There are currently no metrics for the async work-queue size (you should
> be able to see the queue stats with debug logs enabled though [1]). As far
> as I can tell from looking at the code, the async operator is able to
> checkpoint even if the work-queue is exhausted.
>
> Arvid can you please validate the above? (the checkpoints not being
> blocked by the work queue part)
>
> [1]
> https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java#L109
>
> Best,
> D.
>
> On Sun, Nov 7, 2021 at 10:41 AM Dongwon Kim  wrote:
>
>> Hi community,
>>
>> While using Flink's async i/o for interacting with an external system, I
>> got the following exception:
>>
>> 2021-11-06 10:38:35,270 INFO  
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
>> checkpoint 54 (type=CHECKPOINT) @ 1636162715262 for job 
>> f168a44ea33198cd71783824d49f9554.
>> 2021-11-06 10:38:47,031 INFO  
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
>> checkpoint 54 for job f168a44ea33198cd71783824d49f9554 (11930992707 bytes, 
>> checkpointDuration=11722 ms, finalizationTime=47 ms).
>> 2021-11-06 10:58:35,270 INFO  
>> 

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-08 Thread David Morávek
Hi Dongwon,

There are currently no metrics for the async work-queue size (you should be
able to see the queue stats with debug logs enabled though [1]). As far as
I can tell from looking at the code, the async operator is able to
checkpoint even if the work-queue is exhausted.

Arvid can you please validate the above? (the checkpoints not being blocked
by the work queue part)

[1]
https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java#L109

Best,
D.

On Sun, Nov 7, 2021 at 10:41 AM Dongwon Kim  wrote:

> Hi community,
>
> While using Flink's async i/o for interacting with an external system, I
> got the following exception:
>
> 2021-11-06 10:38:35,270 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 54 (type=CHECKPOINT) @ 1636162715262 for job 
> f168a44ea33198cd71783824d49f9554.
> 2021-11-06 10:38:47,031 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
> checkpoint 54 for job f168a44ea33198cd71783824d49f9554 (11930992707 bytes, 
> checkpointDuration=11722 ms, finalizationTime=47 ms).
> 2021-11-06 10:58:35,270 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering 
> checkpoint 55 (type=CHECKPOINT) @ 1636163915262 for job 
> f168a44ea33198cd71783824d49f9554.
> 2021-11-06 11:08:35,271 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Checkpoint 
> 55 of job f168a44ea33198cd71783824d49f9554 expired before completing.
> 2021-11-06 11:08:35,287 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>  [] - Trying to recover from a global failure.
>
>
> - FYI, I'm using 1.14.0 and enabled unaligned checkpointing and buffer
> debloating
> - the 55th ckpt failed to complete within 10 mins (which is the value of
> execution.checkpointing.timeout)
> - the below graph shows that backpressure skyrocketed around the time the
> 55th ckpt began
> [image: image.png]
>
> What I suspect is the capacity of the asynchronous operation because
> limiting the value can cause back-pressure once the capacity is exhausted
> [1].
>
> Although I could increase the value, I want to monitor the current
> in-flight async i/o requests like the above back-pressure graph on Grafana.
> [2] does not introduce any system metric specific to async i/o.
>
> Best,
>
> Dongwon
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#system-metrics
>
>
>


how to expose the current in-flight async i/o requests as metrics?

2021-11-07 Thread Dongwon Kim
Hi community,

While using Flink's async i/o for interacting with an external system, I
got the following exception:

2021-11-06 10:38:35,270 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Triggering checkpoint 54 (type=CHECKPOINT) @ 1636162715262 for job
f168a44ea33198cd71783824d49f9554.
2021-11-06 10:38:47,031 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Completed checkpoint 54 for job f168a44ea33198cd71783824d49f9554
(11930992707 bytes, checkpointDuration=11722 ms, finalizationTime=47
ms).
2021-11-06 10:58:35,270 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Triggering checkpoint 55 (type=CHECKPOINT) @ 1636163915262 for job
f168a44ea33198cd71783824d49f9554.
2021-11-06 11:08:35,271 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
Checkpoint 55 of job f168a44ea33198cd71783824d49f9554 expired before
completing.
2021-11-06 11:08:35,287 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] -
Trying to recover from a global failure.


- FYI, I'm using 1.14.0 and enabled unaligned checkpointing and buffer
debloating
- the 55th ckpt failed to complete within 10 mins (which is the value of
execution.checkpointing.timeout)
- the below graph shows that backpressure skyrocketed around the time the
55th ckpt began
[image: image.png]

What I suspect is the capacity of the asynchronous operation because
limiting the value can cause back-pressure once the capacity is exhausted
[1].

Although I could increase the value, I want to monitor the current
in-flight async i/o requests like the above back-pressure graph on Grafana.
[2] does not introduce any system metric specific to async i/o.

Best,

Dongwon

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#system-metrics