看堆栈信息是内存不够,调大一些看看。我之前在读取hdfs上的一个获取地理位置的离线库,也是内存溢出,通过调整内存大小解决的。用的streamingapi开发的作业,1.12.5版本。


| |
allanqinjy
|
|
allanqi...@163.com
|
签名由网易邮箱大师定制


On 10/8/2022 21:00,RS<tinyshr...@163.com> wrote:
Hi,


版本:Flink-1.15.1


有个场景,从hdfs读文件再处理数据,batch mode,10个并发,使用Flink 
SQL定义执行,source是connector=filesystem,format=raw,path=<HDFS文件路径>


执行任务的时候,有时候能成功,有时候失败了然后就一直失败,重启集群好像可以解决问题,这种情况如何是什么原因导致的?


集群的off-heap都是默认配置,
taskmanager.memory.task.off-heap.size=0
taskmanager.memory.framework.off-heap.size=128MB


报错堆栈:
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct 
out-of-memory error has occurred. This can mean two things: either job(s) 
require(s) a larger size of JVM direct memory or there is a direct memory leak. 
The direct memory can be allocated by user code or some of its dependencies. In 
this case 'taskmanager.memory.task.off-heap.size' configuration option should 
be increased. Flink framework and its dependencies also consume the direct 
memory, mostly for network communication. The most of network memory is managed 
by Flink and should not result in out-of-memory error. In certain special 
cases, in particular for jobs with high parallelism, the framework may require 
more direct memory which is not managed by Flink. In this case 
'taskmanager.memory.framework.off-heap.size' configuration option should be 
increased. If the error persists then there is probably a direct memory leak in 
user code or some of its dependencies which has to be investigated and fixed. 
The task executor has to be shutdown...
at java.nio.Bits.reserveMemory(Bits.java:695)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool.java:72)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.reallocPacketBuf(PacketReceiver.java:270)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:163)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
at 
org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.readNextPacket(BlockReaderRemote.java:183)
at 
org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.read(BlockReaderRemote.java:142)
at 
org.apache.hadoop.hdfs.ByteArrayStrategy.readFromBlock(ReaderStrategy.java:118)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:704)
at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:765)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:825)
at java.io.DataInputStream.read(DataInputStream.java:149)
at 
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:96)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:742)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:586)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:505)
at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:50)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.loadSplit(ContinuousFileReaderOperator.java:415)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.access$300(ContinuousFileReaderOperator.java:98)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$ReaderState$2.prepareToProcessRecord(ContinuousFileReaderOperator.java:122)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:348)
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:750)


Thanks

回复