Thanks Julian. Appreciated your help. I will check task manager logs and decide pick which approah.
Best regards Rainie On Thu, Mar 4, 2021 at 3:04 PM Jaffe, Julian <julianja...@activision.com> wrote: > Hey Rainie, > > > > Kafka internally attempts to retry topic metadata fetches if possible. If > you think the root cause was just due to network congestion or the like, > you might want to look into increasing `request.timeout.ms`. Because of > the internal retry attempts, however, this exception usually means that the > client was unable to connect to the brokers in a more permanent way. The > usual culprits are SSL topics with plaintext consumers or vice versa, SSL > topics where the client was unable to fetch the necessary credentials, > incorrect bootstrap servers configs, and cluster maintenance. Since your > job was running successfully for a long time before crashing, I’d check the > task manager logs for the period just before the error for any warnings or > errors from Kafka to see if they shed light. If you’re dynamically loading > SSL certs or using cnames to provide stable bootstrap server identifiers, > it can be non-obvious if your config changed mid-operation, but often times > some part of the stack will log a warning message or swallowed exception. > If you have verbose logging, you might also be logging the Kafka connection > properties each time they’re regenerated. > > > > TL;DR: > > > > If you think the issue is due to high latency in communicating with your > Kafka cluster, increase your configured timeouts. > > > > If you think the issue is due to (possibly temporary) dynamic config > changes, check your error handling. Retrying the fetch from the Flink side > will only help if the cause of the time is very transient. > > > > Julian > > > > *From: *Rainie Li <raini...@pinterest.com> > *Date: *Thursday, March 4, 2021 at 1:49 PM > *To: *"matth...@ververica.com" <matth...@ververica.com> > *Cc: *user <user@flink.apache.org>, Chesnay Schepler <ches...@apache.org> > *Subject: *Re: Flink application kept restarting > > > > Hi Matthias, > > > > Do you have any suggestions to handle timeout issues when fetching data > from a Kafka topic? > > I am thinking of adding a retry logic into flink job, not sure if this is > the right direction. > > > > Thanks again > > Best regards > > Rainie > > > > On Wed, Mar 3, 2021 at 12:24 AM Matthias Pohl <matth...@ververica.com> > wrote: > > Hi Rainie, > > in general buffer pools being destroyed usually mean that some other > exception occurred that caused the task to fail and in the process of > failure handling the operator-related network buffer is destroyed. That > causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your > case. It looks like you had some timeout problem while fetching data from a > Kafka topic. > > > > Matthias > > > > On Tue, Mar 2, 2021 at 10:39 AM Rainie Li <raini...@pinterest.com> wrote: > > Thanks for checking, Matthias. > > > > I have another flink job which failed last weekend with the same buffer > pool destroyed error. This job is also running version 1.9. > > Here is the error I found from the task manager log. Any suggestion what > is the root cause and how to fix it? > > > > 2021-02-28 00:54:45,943 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Error > while canceling task. > java.lang.RuntimeException: Buffer pool is destroyed. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > at > com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > -- > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.runtime.io.network.partition.ProducerFailedException: > org.apache.kafka.common.errors.TimeoutException: Timeout expired while > fetching topic metadata > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:107) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:170) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1452) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:959) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:86) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474) > ... 2 more > Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout > expired while fetching topic metadata > > > > > > Thanks again! > > Best regards > > Rainie > > > > On Mon, Mar 1, 2021 at 4:38 AM Matthias Pohl <matth...@ververica.com> > wrote: > > Another question is: The timeout of 48 hours sounds strange. There should > have been some other system noticing the connection problem earlier > assuming that you have a reasonably low heartbeat interval configured. > > > > Matthias > > > > On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl <matth...@ververica.com> > wrote: > > Thanks for providing this information, Rainie. Are other issues documented > in the logs besides the TimeoutException in the JM logs which you already > shared? For now, it looks like that there was a connection problem between > the TaskManager and the JobManager that caused the shutdown of the operator > resulting in the NetworkBufferPool to be destroyed. For this scenario I > would expect other failures to occur besides the ones you shared. > > > > Best, > Matthias > > > > On Fri, Feb 26, 2021 at 8:28 PM Rainie Li <raini...@pinterest.com> wrote: > > Thank you Mattias. > > It’s version1.9. > > > > Best regards > > Rainie > > > > On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl <matth...@ververica.com> > wrote: > > Hi Rainie, > > the network buffer pool was destroyed for some reason. This happens when > the NettyShuffleEnvironment gets closed which is triggered when an operator > is cleaned up, for instance. Maybe, the timeout in the metric system caused > this. But I'm not sure how this is connected. I'm gonna add Chesnay to this > conversation hoping that he can give more insights. > > > > If I may ask: What Flink version are you using? > > > > Thanks, > Matthias > > > > > > On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <raini...@pinterest.com> wrote: > > Hi All, > > > > Our flink application kept restarting and it did lots of RPC calls to a > dependency service. > > > > *We saw this exception from failed task manager log: * > > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179) > at > com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208) > Caused by: java.lang.RuntimeException: Buffer pool is destroyed. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203) > at > com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > ... 23 more > Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > ... 32 more > > > > *We also saw this exception from Job manager log:* > > 2021-02-25 21:32:42,874 ERROR akka.remote.Remoting > - Association to [akka.tcp://flink-metrics@host:38593] > with UID [-1261564990] irrecoverably failed. Quarantining address. > java.util.concurrent.TimeoutException: Remote system has been silent for > too long. (more than 48.0 hours) > at > akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > This app has been running fine for a month. > > Any suggestion what could cause the issue? Any suggestions on how to debug > it? > > Appreciated all advice. > > > > Thanks > > Best regards > > Rainie > >