Hi, *- 每次集群只跑这一个任务,执行结束才开始下一个任务,如果是前面的任务read/write申请的堆外内存,执行结束的时候,会立即释放吗?*
在每次执行作业后,有主动关闭查询结果和被打开的资源吗?或者可以参考这个[1]排查一下作业本身是否有内存泄漏。 *- 执行几次任务之后,才出现这种异常,前面任务都是成功的,后面任务就异常了,感觉有内存泄漏的现象。* 您的sql作业是通过sql client执行的吗?sql client可以同时提交多个异步的作业,可以去session集群的WebUI上看看正在运行的作业,确定一下前面的任务是否关闭。 > By default, SQL Client executes DML statements asynchronously. That means, > SQL Client will submit a job for the DML statement to a Flink cluster, and > not wait for the job to finish. So SQL Client can submit multiple jobs at > the same time *- Flink taskmanager的off-heap内存管理有更多的介绍吗? * 看看这篇是否有帮助 “Off-heap Memory in Apache Flink and the curious JIT compiler” [3] [1] https://nodejh.com/posts/flink-%E4%BB%BB%E5%8A%A1%E5%86%85%E5%AD%98%E6%B3%84%E6%BC%8F%E5%AF%BC%E8%87%B4%E9%A2%91%E7%B9%81-full-fc-%E5%AF%BC%E8%87%B4-cpu-%E6%9A%B4%E5%A2%9E%E9%97%AE%E9%A2%98%E6%8E%92%E6%9F%A5/ [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-dml-statements-syncasync [3] https://flink.apache.org/news/2015/09/16/off-heap-memory.html?spm=a2c6h.12873639.article-detail.7.506a2e399nTOH6#appendix-detailed-micro-benchmarks RS <tinyshr...@163.com> 于2022年10月11日周二 09:26写道: > Hi, > 调大 taskmanager.memory.task.off-heap.size 应该能解决部分问题, > > 我这里还有些疑问,部署的session集群,每次集群只跑这一个任务,执行结束才开始下一个任务,如果是前面的任务read/write申请的堆外内存,执行结束的时候,会立即释放吗? > 执行几次任务之后,才出现这种异常,前面任务都是成功的,后面任务就异常了,感觉有内存泄漏的现象。Flink > taskmanager的off-heap内存管理有更多的介绍吗?(官网的看过了) > Thanks > > 在 2022-10-10 12:34:55,"yanfei lei" <fredia...@gmail.com> 写道: > >从报错看是Direct memory不够导致的,可以将taskmanager.memory.task.off-heap.size调大试试看。 > > > >Best, > >Yanfei > > > >allanqinjy <allanqi...@163.com> 于2022年10月8日周六 21:19写道: > > > >> > >> > 看堆栈信息是内存不够,调大一些看看。我之前在读取hdfs上的一个获取地理位置的离线库,也是内存溢出,通过调整内存大小解决的。用的streamingapi开发的作业,1.12.5版本。 > >> > >> > >> | | > >> allanqinjy > >> | > >> | > >> allanqi...@163.com > >> | > >> 签名由网易邮箱大师定制 > >> > >> > >> On 10/8/2022 21:00,RS<tinyshr...@163.com> wrote: > >> Hi, > >> > >> > >> 版本:Flink-1.15.1 > >> > >> > >> 有个场景,从hdfs读文件再处理数据,batch mode,10个并发,使用Flink > >> SQL定义执行,source是connector=filesystem,format=raw,path=<HDFS文件路径> > >> > >> > >> 执行任务的时候,有时候能成功,有时候失败了然后就一直失败,重启集群好像可以解决问题,这种情况如何是什么原因导致的? > >> > >> > >> 集群的off-heap都是默认配置, > >> taskmanager.memory.task.off-heap.size=0 > >> taskmanager.memory.framework.off-heap.size=128MB > >> > >> > >> 报错堆栈: > >> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct > >> out-of-memory error has occurred. This can mean two things: either > job(s) > >> require(s) a larger size of JVM direct memory or there is a direct > memory > >> leak. The direct memory can be allocated by user code or some of its > >> dependencies. In this case 'taskmanager.memory.task.off-heap.size' > >> configuration option should be increased. Flink framework and its > >> dependencies also consume the direct memory, mostly for network > >> communication. The most of network memory is managed by Flink and should > >> not result in out-of-memory error. In certain special cases, in > particular > >> for jobs with high parallelism, the framework may require more direct > >> memory which is not managed by Flink. In this case > >> 'taskmanager.memory.framework.off-heap.size' configuration option > should be > >> increased. If the error persists then there is probably a direct memory > >> leak in user code or some of its dependencies which has to be > investigated > >> and fixed. The task executor has to be shutdown... > >> at java.nio.Bits.reserveMemory(Bits.java:695) > >> at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) > >> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) > >> at > >> > org.apache.hadoop.util.DirectBufferPool.getBuffer(DirectBufferPool.java:72) > >> at > >> > org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.reallocPacketBuf(PacketReceiver.java:270) > >> at > >> > org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:163) > >> at > >> > org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102) > >> at > >> > org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.readNextPacket(BlockReaderRemote.java:183) > >> at > >> > org.apache.hadoop.hdfs.client.impl.BlockReaderRemote.read(BlockReaderRemote.java:142) > >> at > >> > org.apache.hadoop.hdfs.ByteArrayStrategy.readFromBlock(ReaderStrategy.java:118) > >> at > >> > org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:704) > >> at > >> > org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:765) > >> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:825) > >> at java.io.DataInputStream.read(DataInputStream.java:149) > >> at > >> > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:96) > >> at > >> > org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:742) > >> at > >> > org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:586) > >> at > >> > org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:505) > >> at > >> > org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:50) > >> at > >> > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.loadSplit(ContinuousFileReaderOperator.java:415) > >> at > >> > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.access$300(ContinuousFileReaderOperator.java:98) > >> at > >> > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$ReaderState$2.prepareToProcessRecord(ContinuousFileReaderOperator.java:122) > >> at > >> > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:348) > >> at > >> > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > >> at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > >> at > >> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) > >> at > >> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > >> at > >> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > >> at > >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > >> at > >> > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > >> at > >> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > >> at java.lang.Thread.run(Thread.java:750) > >> > >> > >> Thanks >