Hi Jakob,

Your revision number is fairly new and your direct memory
configuration seems to be correct for your setup. If you have the
time, you could verify that the memory flags for the JVM are set
correctly by the startup script. You can see that in the first lines
of the task manager log. If the direct memory was set to 2GB with the
default number of network buffers, the JVM should have had enough
direct memory. Still, we'd like to find out what caused your problem.

Are you running on YARN or standalone?

Yes, the usual setup is one task manager per host/VM. The task manager
will allocate all memory upfront. However, a large part of this memory
will be self-managed by Flink and not touched much by the GC. By
default, this is 0.7 of the configured heap memory. You can control
this ratio with the taskmanager.memory.fraction variable. You can also
set a fixed managed memory size using taskmanager.memory.size (MB). In
large memory setups, we have seen a slightly better performance using
off-heap memory allocation. This can be configured using
taskmanager.memory.off-heap: true.

Please let us know if you experience any further issues.

Best,
Max

On Mon, Oct 19, 2015 at 10:14 PM, Jakob Ericsson
<jakob.erics...@gmail.com> wrote:
> The revision is "Starting JobManager (Version: 0.10-SNAPSHOT, Rev:c82ebbf,
> Date:15.10.2015 @ 11:34:01 CEST)"
>
> We have a lot of memory left on the machine. I have increased it quite a
> lot.
>
> What is your thought on memory configuration?
> If I understand Flink correctly, you should only have one taskmanager
> running each host?
>
> For a pretty standard machine with 16 cores and 32-64 GB memory. This means
> that you will have one java process running with a Xmx30G or even higher for
> exhausting all memory of the machine. This is, at least for the CMS GC, not
> the most optimal configuration.
> It might be viable for G1 but we got some really serious java core dumps
> when running G1.
>
> I looked a bit on the flags that was set on the process and it seems that
> Xmx and MaxDirectMemorySize are set to the same value by the shell script.
> When I got the "java.lang.OutOfMemoryError: Direct buffer memory", I was
> running with a taskmanager.heap.mb:2048. So the direct memory buffer was set
> to 2GB.
>
> I have restarted the process with G1 again and 20GB as taskmanager.heap.mb.
> Lets see if it will be stable during the night.
>
>
> On Mon, Oct 19, 2015 at 6:31 PM, Maximilian Michels <m...@apache.org> wrote:
>>
>> You can see the revision number and the build date in the JobManager
>> log file, e.g. "Starting JobManager (Version: 0.10-SNAPSHOT,
>> Rev:1b79bc1, Date:18.10.2015 @ 20:15:08 CEST)"
>>
>> On Mon, Oct 19, 2015 at 5:53 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>> > When was the last time you updated your 0.10-SNAPSHOT Flink cluster?
>> > If it has been more than a couple of weeks, then I'd advise you to
>> > update to the latest snapshot version. There has been an issue with
>> > the calculation of the off-heap memory limit in the past.
>> >
>> > Thanks,
>> > Max
>> >
>> > On Mon, Oct 19, 2015 at 5:26 PM, Gyula Fóra <gyula.f...@gmail.com>
>> > wrote:
>> >> It's 0.10-SNAPSHOT
>> >>
>> >> Gyula
>> >>
>> >> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2015. okt. 19.,
>> >> H,
>> >> 17:13):
>> >>>
>> >>> I forgot to ask you: Which version of Flink are you using? 0.9.1 or
>> >>> 0.10-SNAPSHOT?
>> >>>
>> >>> On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <m...@apache.org>
>> >>> wrote:
>> >>> > Hi Jakob,
>> >>> >
>> >>> > Thanks. Flink allocates its network memory as direct memory outside
>> >>> > the normal Java heap. By default, that is 64MB but can grow up to
>> >>> > 128MB on heavy network transfer. How much memory does your machine
>> >>> > have? Could it be that your upper memory bound is lower than 2048 +
>> >>> > 128 MB?
>> >>> >
>> >>> > Best,
>> >>> > Max
>> >>> >
>> >>> > On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
>> >>> > <jakob.erics...@gmail.com> wrote:
>> >>> >> Hi,
>> >>> >>
>> >>> >> See answers below.
>> >>> >>
>> >>> >> /Jakob
>> >>> >>
>> >>> >> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels
>> >>> >> <m...@apache.org>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Hi Jakob,
>> >>> >>>
>> >>> >>> Thank you for reporting the bug. Could you please post your
>> >>> >>> configuration here? In particular, could you please tell us the
>> >>> >>> value
>> >>> >>> of the following configuration variables:
>> >>> >>>
>> >>> >>> taskmanager.heap.mb
>> >>> >>
>> >>> >> taskmanager.heap.mb: 2048
>> >>> >>>
>> >>> >>> taskmanager.network.numberOfBuffers
>> >>> >>
>> >>> >>
>> >>> >> Default value. Not changed.
>> >>> >>
>> >>> >>>
>> >>> >>> taskmanager.memory.off-heap
>> >>> >>>
>> >>> >> Default value Not changed.
>> >>> >>
>> >>> >>>
>> >>> >>> Are you running the Flink cluster in batch or streaming mode?
>> >>> >>>
>> >>> >> Started in streaming mode. Running with two nodes. In the cluster.
>> >>> >> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" due
>> >>> >> to
>> >>> >> some
>> >>> >> strange java core dumps in the G1 GC.
>> >>> >>
>> >>> >>>
>> >>> >>> Direct memory is used by Flink's network layer. My guess is that
>> >>> >>> you
>> >>> >>> have set taskmanager.heap.mb too low (it constraints the number of
>> >>> >>> direct memory at the moment).
>> >>> >>>
>> >>> >>> Thank you,
>> >>> >>> Max
>> >>> >>>
>> >>> >>>
>> >>> >>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>> >>> >>> <jakob.erics...@gmail.com> wrote:
>> >>> >>> > Hello,
>> >>> >>> >
>> >>> >>> > We are running into a strange problem with Direct Memory
>> >>> >>> > buffers.
>> >>> >>> > From
>> >>> >>> > what
>> >>> >>> > I know, we are not using any direct memory buffers inside our
>> >>> >>> > code.
>> >>> >>> > This is pretty trivial streaming application just doing some
>> >>> >>> > dedupliction
>> >>> >>> > and union some kafka streams.
>> >>> >>> >
>> >>> >>> > /Jakob
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > 2015-10-19 13:27:59,064 INFO
>> >>> >>> > org.apache.flink.runtime.taskmanager.Task
>> >>> >>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to
>> >>> >>> > FAILED
>> >>> >>> > with
>> >>> >>> > exception.
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> >>> >>> >         at
>> >>> >>> > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>> >>> >>> >         at java.lang.Thread.run(Thread.java:745)
>> >>> >>> > Caused by: io.netty.handler.codec.DecoderException:
>> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> >>> >>> >         ... 9 more
>> >>> >>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>> >>> >>> >         at java.nio.Bits.reserveMemory(Bits.java:658)
>> >>> >>> >         at
>> >>> >>> > java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> >>> >>> >         at
>> >>> >>> > java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>> >>> >>> >         at
>> >>> >>> > io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>> >>> >>> >         at
>> >>> >>> >
>> >>> >>> >
>> >>> >>> >
>> >>> >>> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>> >>> >>> >         ... 10 more
>> >>> >>> >
>> >>> >>
>> >>> >>
>
>
  • [no subject] Jakob Ericsson
    • Re: Maximilian Michels
      • Re: Jakob Ericsson
        • Re: Maximilian Michels
          • Re: Maximilian Michels
            • Re: Gyula Fóra
              • Re: Maximilian Michels
                • Re: Maximilian Michels
                • Re: Jakob Ericsson
                • Re: Maximilian Michels
                • Re: Stephan Ewen
                • Re: Jakob Ericsson

Reply via email to