Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-10 Thread Fabian Paul
Hi Dongwon, Thanks for sharing the logs and the metrics screenshots with us. Unfortunately, I think we need more information to further isolate the problem therefore I have a couple of suggestions. 1. Since you already set up PromQL can you also share the JVM memory statics i.e. DirectMemory

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Dongwon Kim
Hi Fabian, Can you maybe share more about the setup and how you use the AsyncFunction > with > the Kafka client? Oops, I didn't mention in the reply to David that the kafka producer has nothing to do with the AsyncFunction! I interact with Redis and a Spring boot app. in the AsyncFunction, not

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Dongwon Kim
Hi Jun, Did you override AsyncFunction#timeout()? If so, did you call > resultFuture.complete()/completeExceptionally() in your override? Not > calling them can result in checkpoint timeout. No, I've only called resultFuture.complete() in AsyncFunction.asyncInvoke() and didn't know much about

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Piotr Nowojski
Hi All, to me it looks like something deadlocked, maybe due to this OOM error from Kafka, preventing a Task from making any progress. To confirm Dongwan you could collecte stack traces while the job is in such a blocked state. Deadlocked Kafka could easily explain those symptoms and it would be

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Fabian Paul
Hi Dongwan, Can you maybe share more about the setup and how you use the AsyncFunction with the Kafka client? As David already pointed out it could be indeed a Kafka bug but it could also mean that your defined async function leaks direct memory by not freeing some resources. We can definitely

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Jun Qin
Hi Dongwon Did you override AsyncFunction#timeout()? If so, did you call resultFuture.complete()/completeExceptionally() in your override? Not calling them can result in checkpoint timeout. Thanks Jun > On Nov 9, 2021, at 7:37 AM, Dongwon Kim wrote: > > Hi David, > > There are currently

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread David Morávek
This is definitely a bug on Kafka side, because they're not handling uncaught exceptions properly [1]. I don't think there is much we can do on the Flink side here, because we're not able to override factory for the Kafka IO thread :/ [1] https://issues.apache.org/jira/browse/KAFKA-4228 On Tue,

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-08 Thread Dongwon Kim
Hi David, There are currently no metrics for the async work-queue size (you should be > able to see the queue stats with debug logs enabled though [1]). Thanks for the input but scraping DEBUG messages into, for example, ElasticSearch for monitoring on Grafana is not possible in my current

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-08 Thread David Morávek
Hi Dongwon, There are currently no metrics for the async work-queue size (you should be able to see the queue stats with debug logs enabled though [1]). As far as I can tell from looking at the code, the async operator is able to checkpoint even if the work-queue is exhausted. Arvid can you

how to expose the current in-flight async i/o requests as metrics?

2021-11-07 Thread Dongwon Kim
Hi community, While using Flink's async i/o for interacting with an external system, I got the following exception: 2021-11-06 10:38:35,270 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Triggering checkpoint 54 (type=CHECKPOINT) @ 1636162715262 for job