谢谢 Xintong 大神回复,看了你很多视频。顺便请教个问题,slot的内存有最小的限制吗?我想用有限的资源情况下,把taskmanager的内存slot拆分成最小,以此来达到最大并发。这种拆分有没有一个合理的范围。 比如 1 个TM,8G, 那它拆分的最小slot数量 有没有一个限制。 ________________________________ 发件人: Xintong Song <[email protected]> 发送时间: 2020年11月17日 1:53 收件人: user-zh <[email protected]> 主题: Re: 回复: flink-1.11.2 的 内存溢出问题
> > 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size 这个参数可以通过 下面代码动态设置吗? > > streamTableEnv.getConfig().getConfiguration().setString(key, value); > 不可以的,这个是集群配置。 可以通过 flink-conf.yaml 配置文件进行配置,或者在提交作业时通过 -yD key=value 的方式动态指定。 Thank you~ Xintong Song On Tue, Nov 17, 2020 at 9:31 AM Andrew <[email protected]> wrote: > 应该是不可以这样配置的, 通过配置文件; > taskmanager.memory.task.off-heap.size 参数属于taskmanager启动参数; > > > streamTableEnv.getConfig().getConfiguration().setString(key, value); > 这种属于任务运行时配置! > > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > [email protected]>; > 发送时间: 2020年11月16日(星期一) 晚上7:14 > 收件人: "[email protected]"<[email protected]>; > > 主题: 回复: flink-1.11.2 的 内存溢出问题 > > > > 好的,谢谢回复,那请问下 taskmanager.memory.task.off-heap.size 这个参数可以通过 > 下面代码动态设置吗? > > streamTableEnv.getConfig().getConfiguration().setString(key, value); > > ________________________________ > 发件人: Xintong Song <[email protected]> > 发送时间: 2020年11月16日 10:59 > 收件人: user-zh <[email protected]> > 主题: Re: flink-1.11.2 的 内存溢出问题 > > 那应该不存在内存泄露的问题。应该就是 job 需要的 direct 内存不够用。 > 可以尝试按报错信息中提示的,把 `taskmanager.memory.task.off-heap.size` 调大看看。 > 只调大 TM 的总内存没有用的,不会增加 job 可用的 direct 内存。 > > Thank you~ > > Xintong Song > > > > On Mon, Nov 16, 2020 at 6:38 PM 史 正超 <[email protected]> wrote: > > > flink-on-yarn . per-job模式,重启是kafka的group.id > > 没变,应该是接着offset消费的,但是任务启动不起来。不知道是不是一段时间后,积压导致的。 > > ________________________________ > > 发件人: Xintong Song <[email protected]> > > 发送时间: 2020年11月16日 10:11 > > 收件人: user-zh <[email protected]> > > 主题: Re: flink-1.11.2 的 内存溢出问题 > > > > 是什么部署模式呢?standalone? > > 之前任务运行一段时间报错之后,重新运行的时候是所有 TM 都重启了吗?还是有复用之前的 TM? > > > > Thank you~ > > > > Xintong Song > > > > > > > > On Mon, Nov 16, 2020 at 5:53 PM 史 正超 <[email protected]> > wrote: > > > > > 使用的是rocksdb, 并行度是5,1个tm, 5个slot,tm 内存给 > > > > 10G,启动任务报下面的错误。之前有启动成功过,运行一段时间后,也是报内存溢出,然后接成原来的offset启动任务,直接启动不起来了。 > > > > > > 2020-11-16 17:44:52 > > > java.lang.OutOfMemoryError: Direct buffer memory. The direct > > out-of-memory > > > error has occurred. This can mean two things: either job(s) > require(s) a > > > larger size of JVM direct memory or there is a direct memory > leak. The > > > direct memory can be allocated by user code or some of its > dependencies. > > In > > > this case 'taskmanager.memory.task.off-heap.size' configuration > option > > > should be increased. Flink framework and its dependencies also > consume > > the > > > direct memory, mostly for network communication. The most of > network > > memory > > > is managed by Flink and should not result in out-of-memory > error. In > > > certain special cases, in particular for jobs with high > parallelism, the > > > framework may require more direct memory which is not managed by > Flink. > > In > > > this case 'taskmanager.memory.framework.off-heap.size' > configuration > > option > > > should be increased. If the error persists then there is > probably a > > direct > > > memory leak in user code or some of its dependencies which has > to be > > > investigated and fixed. The task executor has to be shutdown... > > > at > java.nio.Bits.reserveMemory(Bits.java:658) > > > at > java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) > > > at > java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) > > > at > sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174) > > > at > sun.nio.ch.IOUtil.read(IOUtil.java:195) > > > at > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > > > at > > > > > > org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109) > > > at > > > > > > org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101) > > > at > > > > > > org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75) > > > at > > > > > > org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203) > > > at > > > > > > org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167) > > > at > > > > > > org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381) > > > at > > > > > > org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326) > > > at > > > > > > org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433) > > > at > > > > > > org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) > > > at > > > > > > org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) > > > at > > > > > > org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) > > > at > > > > > > org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > > > at > > > > > > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535) > > > at > > > > > > org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264) > > > > > > > > > > >
