??????????????????????, ????????????;
taskmanager.memory.task.off-heap.size ????????taskmanager??????????


streamTableEnv.getConfig().getConfiguration().setString(key, value); 
????????????????????????



------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2020??11??16??(??????) ????7:14
??????:&nbsp;"[email protected]"<[email protected]&gt;;

????:&nbsp;????: flink-1.11.2 ?? ????????????



???????????????????????? taskmanager.memory.task.off-heap.size&nbsp; 
???????????????? ????????????????????

streamTableEnv.getConfig().getConfiguration().setString(key, value);

________________________________
??????: Xintong Song <[email protected]&gt;
????????: 2020??11??16?? 10:59
??????: user-zh <[email protected]&gt;
????: Re: flink-1.11.2 ?? ????????????

???????????????????????????????????? job ?????? direct ????????????
?????????????????????????????? `taskmanager.memory.task.off-heap.size` 
??????????
?????? TM ?????????????????????????? job ?????? direct ??????

Thank you~

Xintong Song



On Mon, Nov 16, 2020 at 6:38 PM ?? ???? <[email protected]&gt; wrote:

&gt; flink-on-yarn . per-job????????????kafka??group.id
&gt; 
????????????????offset????????????????????????????????????????????????????????????????
&gt; ________________________________
&gt; ??????: Xintong Song <[email protected]&gt;
&gt; ????????: 2020??11??16?? 10:11
&gt; ??????: user-zh <[email protected]&gt;
&gt; ????: Re: flink-1.11.2 ?? ????????????
&gt;
&gt; ??????????????????standalone??
&gt; ?????????????????????????????????????????????????? TM 
???????????????????????????? TM??
&gt;
&gt; Thank you~
&gt;
&gt; Xintong Song
&gt;
&gt;
&gt;
&gt; On Mon, Nov 16, 2020 at 5:53 PM ?? ???? <[email protected]&gt; 
wrote:
&gt;
&gt; &gt; ????????rocksdb, ????????5??1??tm, 5??slot??tm ??????
&gt; &gt; 
10G????????????????????????????????????????????????????????????????????????????????????????offset????????????????????????????
&gt; &gt;
&gt; &gt; 2020-11-16 17:44:52
&gt; &gt; java.lang.OutOfMemoryError: Direct buffer memory. The direct
&gt; out-of-memory
&gt; &gt; error has occurred. This can mean two things: either job(s) 
require(s) a
&gt; &gt; larger size of JVM direct memory or there is a direct memory leak. The
&gt; &gt; direct memory can be allocated by user code or some of its 
dependencies.
&gt; In
&gt; &gt; this case 'taskmanager.memory.task.off-heap.size' configuration option
&gt; &gt; should be increased. Flink framework and its dependencies also consume
&gt; the
&gt; &gt; direct memory, mostly for network communication. The most of network
&gt; memory
&gt; &gt; is managed by Flink and should not result in out-of-memory error. In
&gt; &gt; certain special cases, in particular for jobs with high parallelism, 
the
&gt; &gt; framework may require more direct memory which is not managed by 
Flink.
&gt; In
&gt; &gt; this case 'taskmanager.memory.framework.off-heap.size' configuration
&gt; option
&gt; &gt; should be increased. If the error persists then there is probably a
&gt; direct
&gt; &gt; memory leak in user code or some of its dependencies which has to be
&gt; &gt; investigated and fixed. The task executor has to be shutdown...
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at java.nio.Bits.reserveMemory(Bits.java:658)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.nio.DirectByteBuffer.<init&gt;(DirectByteBuffer.java:123)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at 
sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at sun.nio.ch.IOUtil.read(IOUtil.java:195)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at 
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:109)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:101)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.kafka011.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:326)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getRecordsFromKafka(KafkaConsumerThread.java:535)
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; &gt;
&gt; 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:264)
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt;

回复