一般情况下是内存太小了,导致的问题 应聘程序员 北京邮电大学 <[email protected]> 于2019年4月9日周二 下午1:37写道:
> hi, 大家好! > 今天运行flink时抛出了Buffer pool is > destroyed异常,数据源是kafka;消费前kafka队列中堆积了8G左右的数据。详细报错如下: > > > > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > > at java.util.ArrayList.forEach(ArrayList.java:1257) > > at > ai.momenta.supernova.galactic.telemetrystreaming.flink.riskidentification.map.TelemetryEventMapper.lambda$map$ad330a96$1(TelemetryEventMapper.java:74) > > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) > > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:187) > > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:152) > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > > at > org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > > ... 25 more > > Caused by: java.lang.RuntimeException: Buffer pool is destroyed. > > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > > at > ai.momenta.supernova.galactic.telemetrystreaming.flink.riskidentification.watermark.TelemetryEventWatermarker$1.processElement(TelemetryEventWatermarker.java:30) > > at > ai.momenta.supernova.galactic.telemetrystreaming.flink.riskidentification.watermark.TelemetryEventWatermarker$1.processElement(TelemetryEventWatermarker.java:24) > > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > > ... 31 more > > Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. > > at org.apache.flink.runtime.io > .network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:244) > > at org.apache.flink.runtime.io > .network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218) > > at org.apache.flink.runtime.io > .network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:236) > > at org.apache.flink.runtime.io > .network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:229) > > at org.apache.flink.runtime.io > .network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:149) > > at org.apache.flink.runtime.io > .network.api.writer.RecordWriter.emit(RecordWriter.java:128) > > at org.apache.flink.runtime.io > .network.api.writer.RecordWriter.emit(RecordWriter.java:101) > > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) > > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > > ... 40 more > > > > > > > > > > > 有没有大佬遇到过类似的问题,给点建议。 > 谢谢。 > > > > > >
