Re: OutOfMemoryError in netty local transport

2015-10-01 Thread Maximilian Michels
Hi Robert,

Just a quick update: The issue has been resolved in the latest Maven
0.10-SNAPSHOT dependency.

Cheers,
Max

On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
 wrote:
> Hi Max,
>
> thanks for your quick reply. I found the relevant code and commented it out
> for testing, seems to be working. Happily waiting for the fix. Thanks again.
>
> Robert
>
> On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels  wrote:
>>
>> Hi Robert,
>>
>> This is a regression on the current master due to changes in the way
>> Flink calculates the memory and sets the maximum direct memory size.
>> We introduced these changes when we merged support for off-heap
>> memory. This is not a problem in the way Flink deals with managed
>> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> maximum direct memory is only used by the network stack. The network
>> library we use, allocates more direct memory than we expected.
>>
>> We'll push a fix to the master as soon as possible. Thank you for
>> reporting and thanks for your patience.
>>
>> Best regards,
>> Max
>>
>> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>>  wrote:
>> > Hi everyone,
>> >
>> > I'm constantly running into OutOfMemoryErrors and for the life of me I
>> > cannot figure out what's wrong. Let me describe my setup. I'm running
>> > the
>> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
>> > unfinished implementation of TPC-H Q2
>> >
>> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory
>> > per
>> > machine. This is what I believe to be the relevant section of my
>> > yarn_site.xml:
>> >
>> >
>> > 
>> > yarn.nodemanager.resource.memory-mb
>> > 57344
>> >   
>> > 
>> >   
>> > yarn.scheduler.maximum-allocation-mb
>> > 55296
>> >   
>> >
>> >   
>> > yarn.nodemanager.vmem-check-enabled
>> > false
>> >   
>> >
>> >
>> > And this is how I submit the job:
>> >
>> >
>> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7
>> > .
>> >
>> >
>> > The TMs happily report:
>> >
>> > .
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > -  JVM Options:
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > - -Xms24511m
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > - -Xmx24511m
>> > 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> > - -XX:MaxDirectMemorySize=65m
>> > .
>> >
>> >
>> > I've tried various combinations of YARN and Flink options, to no avail.
>> > I
>> > always end up with the following stacktrace:
>> >
>> >
>> >
>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> > java.lang.OutOfMemoryError: Direct buffer memory
>> > at
>> >
>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> > at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> > at
>> >
>> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> > at
>> >
>> > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> > at
>> >
>> > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> > at
>> >
>> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> > at
>> >
>> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> > at
>> >

Re: DataSet transformation

2015-10-01 Thread Robert Metzger
Hi,

for that you have to collect the dataset to your local machine and then
transform the collection into the array.
Note that this only advised for small data sets.

Robert



On Thu, Oct 1, 2015 at 2:13 PM, Lydia Ickler 
wrote:

> Hi all,
>
> so I have a case class Spectrum(mz: Float, intensity: Float)
> and a DataSet[Spectrum] to read my data in.
>
> Now I want to know if there is a smart way to transform my DataSet into a
> two dimensional Array ?
>
> Thanks in advance,
> Lydia
>
>


DataSet transformation

2015-10-01 Thread Lydia Ickler
Hi all,

so I have a case class Spectrum(mz: Float, intensity: Float)
and a DataSet[Spectrum] to read my data in.

Now I want to know if there is a smart way to transform my DataSet into a two 
dimensional Array ?

Thanks in advance,
Lydia



kryo exception due to race condition

2015-10-01 Thread Stefano Bortoli
Hi guys,

I hit a Kryo exception while running a process 'crossing' POJOs datasets. I
am using the 0.10-milestone-1.
Checking the serializer:
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)

I have noticed that the Kryo instance is reused along serialization calls
(e.g. line 187).  However, Kryo is not threadsafe, and therefore I think it
may cause the problem due to possible race condition. We had these types of
issues solved with a KryoFactory implementing a pool. Perhaps it should
just a matter of calling the

what should I do? Open a ticket?

Thanks a lot guys for the great job!

saluti,
Stefano

-
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
114
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at
org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
at
org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
at
org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
at
org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)


Re: All but one TMs connect when JM has more than 16G of memory

2015-10-01 Thread Robert Schmidtke
So for anyone who is interested, here are some code references for getting
started with Flink on Slurm.

I added basic start and stop scripts for Flink on Slurm in my fork:
https://github.com/robert-schmidtke/flink/tree/flink-slurm/flink-dist/src/main/flink-bin/bin

And I also created an example of how to configure and run it:
https://github.com/robert-schmidtke/flink-slurm/blob/master/flink-slurm-example.sh

I'm not sure I will add much more effort because it works for my setup
right now. However if there's a wider interest I can add a bit more
documentation and insight.

Robert

On Thu, Oct 1, 2015 at 11:51 AM, Robert Metzger  wrote:

> Feel free to contribute a documentation to Flink on how to run Flink on
> SLURM.
>
> On Thu, Oct 1, 2015 at 11:45 AM, Robert Schmidtke 
> wrote:
>
>> I see, thanks for the info. I only have access to my cluster via SLURM
>> and we don't have ssh between our nodes which is why I haven't really
>> considered the Standalone mode. A colleague has set up YARN on SLURM and it
>> was just the easiest to use. I briefly looked into the Flink Standalone
>> mode but dropped it because I thought YARN would be possible after all. It
>> seems I'm going to have a deeper look into starting the master and slaves
>> with SLURM's srun instead of ssh (I guess a slight modification of
>> start-cluster.sh should do the job).
>>
>> On Thu, Oct 1, 2015 at 11:30 AM, Robert Metzger 
>> wrote:
>>
>>> Hi,
>>> there is currently no option for forcing certain containers onto
>>> specific machines.
>>> For running the JM (or any other YARN container) on the AM host, you
>>> first need to have a NodeManager running on the host with the RM. Maybe
>>> YARN is smart enough to schedule the small JM container onto that machine.
>>>
>>> I don't know your exact setup, but maybe it would make sense for you to
>>> run Flink in the standalone cluster mode instead with YARN. It seems that
>>> you have a very good idea how and where you want to run the Flink services
>>> in your cluster. YARN is designed to be an abstraction between the cluster
>>> and the application, that's why its a bit difficult to schedule the
>>> containers to specific machines.
>>>
>>> Robert
>>>
>>>
>>>
>>> On Thu, Oct 1, 2015 at 11:24 AM, Robert Schmidtke <
>>> ro.schmid...@gmail.com> wrote:
>>>
 Hi Robert,

 I had a job failure yesterday with what I believe is the setup I have
 described above. However when trying to reproduce now, the behavior is the
 same: Flink waiting for resources to become available. So no hard error.

 Ok, the looping makes sense then. I haven't thought about shared
 setups. I'm still figuring out how all parameters play together, i.e. -yn,
 -yjm, -ytm and the memory limits in yarn-site.xml. This will need some
 testing and I'll come back with a proper description once I think I know
 what's going on.

 When running Flink on YARN, is it easily possible to place the Flink JM
 where the YARN Resource Manager sits, and all the TMs with the remaining
 Node Managers?

 Robert

 On Thu, Oct 1, 2015 at 10:53 AM, Robert Metzger 
 wrote:

> Hi,
>
> It is interesting to note that when I set both 
> yarn.nodemanager.resource.memory-mb
>> and yarn.scheduler.maximum-allocation-mb to 56G I get a proper error
>> when requesting 56G and 1M, but when setting 
>> yarn.nodemanager.resource.memory-mb
>> to 56G and yarn.scheduler.maximum-allocation-mb to 54G I don't get
>> an error but the aforementioned endless loop.
>
>
> is it a "hard error" (failing) you're getting or just "WARN" log
> messages. I'm asking because I've added some code some time ago to do some
> checks before deploying Flink on YARN. These checks will print WARN log
> messages if the requested YARN session/job does not fit onto the cluster.
> This "endless loop" exists because in many production environments
> Flink can just wait for resources to become available, for example when
> other containers are finishing.
>
>
> Robert
>
> On Wed, Sep 30, 2015 at 6:33 PM, Robert Schmidtke <
> ro.schmid...@gmail.com> wrote:
>
>> Hi Robert,
>>
>> thanks for your reply. It got me digging into my setup and I
>> discovered that one TM was scheduled next to the JM. When specifying -yn 
>> 7
>> the documentation suggests that this is the number of TMs (of which I
>> wanted 7), and I thought an additional container would be used for the JM
>> (my YARN cluster has 8 containers). Anyway with this setup the memory 
>> added
>> up to 56G and 1M (40G per TM and 16G 1M for the JM), but I set a hard
>> maximum of 56G in my yarn-site.xml which is why the request could not be
>> fulfilled. It is interesting to note that when I set
>> both yarn.nodemanager.resource.memory-mb

Re: kryo exception due to race condition

2015-10-01 Thread Stephan Ewen
This looks to me like a bug where type registrations are not properly
forwarded to all Serializers.

Can you open a JIRA ticket for this?

On Thu, Oct 1, 2015 at 6:46 PM, Stefano Bortoli  wrote:

> Hi guys,
>
> I hit a Kryo exception while running a process 'crossing' POJOs datasets.
> I am using the 0.10-milestone-1.
> Checking the serializer:
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
>
> I have noticed that the Kryo instance is reused along serialization calls
> (e.g. line 187).  However, Kryo is not threadsafe, and therefore I think it
> may cause the problem due to possible race condition. We had these types of
> issues solved with a KryoFactory implementing a pool. Perhaps it should
> just a matter of calling the
>
> what should I do? Open a ticket?
>
> Thanks a lot guys for the great job!
>
> saluti,
> Stefano
>
> -
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
> ID: 114
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
> at
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
> at
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
> at
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
> at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
> at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
> at java.lang.Thread.run(Thread.java:745)
>


Re: All but one TMs connect when JM has more than 16G of memory

2015-10-01 Thread Robert Schmidtke
I see, thanks for the info. I only have access to my cluster via SLURM and
we don't have ssh between our nodes which is why I haven't really
considered the Standalone mode. A colleague has set up YARN on SLURM and it
was just the easiest to use. I briefly looked into the Flink Standalone
mode but dropped it because I thought YARN would be possible after all. It
seems I'm going to have a deeper look into starting the master and slaves
with SLURM's srun instead of ssh (I guess a slight modification of
start-cluster.sh should do the job).

On Thu, Oct 1, 2015 at 11:30 AM, Robert Metzger  wrote:

> Hi,
> there is currently no option for forcing certain containers onto specific
> machines.
> For running the JM (or any other YARN container) on the AM host, you first
> need to have a NodeManager running on the host with the RM. Maybe YARN is
> smart enough to schedule the small JM container onto that machine.
>
> I don't know your exact setup, but maybe it would make sense for you to
> run Flink in the standalone cluster mode instead with YARN. It seems that
> you have a very good idea how and where you want to run the Flink services
> in your cluster. YARN is designed to be an abstraction between the cluster
> and the application, that's why its a bit difficult to schedule the
> containers to specific machines.
>
> Robert
>
>
>
> On Thu, Oct 1, 2015 at 11:24 AM, Robert Schmidtke 
> wrote:
>
>> Hi Robert,
>>
>> I had a job failure yesterday with what I believe is the setup I have
>> described above. However when trying to reproduce now, the behavior is the
>> same: Flink waiting for resources to become available. So no hard error.
>>
>> Ok, the looping makes sense then. I haven't thought about shared setups.
>> I'm still figuring out how all parameters play together, i.e. -yn, -yjm,
>> -ytm and the memory limits in yarn-site.xml. This will need some testing
>> and I'll come back with a proper description once I think I know what's
>> going on.
>>
>> When running Flink on YARN, is it easily possible to place the Flink JM
>> where the YARN Resource Manager sits, and all the TMs with the remaining
>> Node Managers?
>>
>> Robert
>>
>> On Thu, Oct 1, 2015 at 10:53 AM, Robert Metzger 
>> wrote:
>>
>>> Hi,
>>>
>>> It is interesting to note that when I set both 
>>> yarn.nodemanager.resource.memory-mb
 and yarn.scheduler.maximum-allocation-mb to 56G I get a proper error
 when requesting 56G and 1M, but when setting 
 yarn.nodemanager.resource.memory-mb
 to 56G and yarn.scheduler.maximum-allocation-mb to 54G I don't get an
 error but the aforementioned endless loop.
>>>
>>>
>>> is it a "hard error" (failing) you're getting or just "WARN" log
>>> messages. I'm asking because I've added some code some time ago to do some
>>> checks before deploying Flink on YARN. These checks will print WARN log
>>> messages if the requested YARN session/job does not fit onto the cluster.
>>> This "endless loop" exists because in many production environments Flink
>>> can just wait for resources to become available, for example when other
>>> containers are finishing.
>>>
>>>
>>> Robert
>>>
>>> On Wed, Sep 30, 2015 at 6:33 PM, Robert Schmidtke <
>>> ro.schmid...@gmail.com> wrote:
>>>
 Hi Robert,

 thanks for your reply. It got me digging into my setup and I discovered
 that one TM was scheduled next to the JM. When specifying -yn 7 the
 documentation suggests that this is the number of TMs (of which I wanted
 7), and I thought an additional container would be used for the JM (my YARN
 cluster has 8 containers). Anyway with this setup the memory added up to
 56G and 1M (40G per TM and 16G 1M for the JM), but I set a hard maximum of
 56G in my yarn-site.xml which is why the request could not be fulfilled. It
 is interesting to note that when I set
 both yarn.nodemanager.resource.memory-mb
 and yarn.scheduler.maximum-allocation-mb to 56G I get a proper error when
 requesting 56G and 1M, but when setting yarn.nodemanager.resource.memory-mb
 to 56G and yarn.scheduler.maximum-allocation-mb to 54G I don't get an error
 but the aforementioned endless loop. Note I
 have yarn.nodemanager.vmem-check-enabled set to false. This is probably a
 YARN issue then / my bad configuration.

 I'm in a rush now (to get to the Flink meetup) and thus will check the
 documentation later to see how to deploy the TMs and JM on separate
 machines each, since that is not what's happening at the moment, but this
 is what I'd like to have. Thanks again and see you in an hour.

 Cheers
 Robert

 On Wed, Sep 30, 2015 at 5:19 PM, Robert Metzger 
 wrote:

> Hi Robert,
>
> the problem here is that YARN's scheduler (there are different
> schedulers in YARN: FIFO, CapacityScheduler, ...) is not giving Flink's
> ApplicationMaster/JobManager 

Re: All but one TMs connect when JM has more than 16G of memory

2015-10-01 Thread Robert Metzger
Feel free to contribute a documentation to Flink on how to run Flink on
SLURM.

On Thu, Oct 1, 2015 at 11:45 AM, Robert Schmidtke 
wrote:

> I see, thanks for the info. I only have access to my cluster via SLURM and
> we don't have ssh between our nodes which is why I haven't really
> considered the Standalone mode. A colleague has set up YARN on SLURM and it
> was just the easiest to use. I briefly looked into the Flink Standalone
> mode but dropped it because I thought YARN would be possible after all. It
> seems I'm going to have a deeper look into starting the master and slaves
> with SLURM's srun instead of ssh (I guess a slight modification of
> start-cluster.sh should do the job).
>
> On Thu, Oct 1, 2015 at 11:30 AM, Robert Metzger 
> wrote:
>
>> Hi,
>> there is currently no option for forcing certain containers onto specific
>> machines.
>> For running the JM (or any other YARN container) on the AM host, you
>> first need to have a NodeManager running on the host with the RM. Maybe
>> YARN is smart enough to schedule the small JM container onto that machine.
>>
>> I don't know your exact setup, but maybe it would make sense for you to
>> run Flink in the standalone cluster mode instead with YARN. It seems that
>> you have a very good idea how and where you want to run the Flink services
>> in your cluster. YARN is designed to be an abstraction between the cluster
>> and the application, that's why its a bit difficult to schedule the
>> containers to specific machines.
>>
>> Robert
>>
>>
>>
>> On Thu, Oct 1, 2015 at 11:24 AM, Robert Schmidtke > > wrote:
>>
>>> Hi Robert,
>>>
>>> I had a job failure yesterday with what I believe is the setup I have
>>> described above. However when trying to reproduce now, the behavior is the
>>> same: Flink waiting for resources to become available. So no hard error.
>>>
>>> Ok, the looping makes sense then. I haven't thought about shared setups.
>>> I'm still figuring out how all parameters play together, i.e. -yn, -yjm,
>>> -ytm and the memory limits in yarn-site.xml. This will need some testing
>>> and I'll come back with a proper description once I think I know what's
>>> going on.
>>>
>>> When running Flink on YARN, is it easily possible to place the Flink JM
>>> where the YARN Resource Manager sits, and all the TMs with the remaining
>>> Node Managers?
>>>
>>> Robert
>>>
>>> On Thu, Oct 1, 2015 at 10:53 AM, Robert Metzger 
>>> wrote:
>>>
 Hi,

 It is interesting to note that when I set both 
 yarn.nodemanager.resource.memory-mb
> and yarn.scheduler.maximum-allocation-mb to 56G I get a proper error
> when requesting 56G and 1M, but when setting 
> yarn.nodemanager.resource.memory-mb
> to 56G and yarn.scheduler.maximum-allocation-mb to 54G I don't get an
> error but the aforementioned endless loop.


 is it a "hard error" (failing) you're getting or just "WARN" log
 messages. I'm asking because I've added some code some time ago to do some
 checks before deploying Flink on YARN. These checks will print WARN log
 messages if the requested YARN session/job does not fit onto the cluster.
 This "endless loop" exists because in many production environments
 Flink can just wait for resources to become available, for example when
 other containers are finishing.


 Robert

 On Wed, Sep 30, 2015 at 6:33 PM, Robert Schmidtke <
 ro.schmid...@gmail.com> wrote:

> Hi Robert,
>
> thanks for your reply. It got me digging into my setup and I
> discovered that one TM was scheduled next to the JM. When specifying -yn 7
> the documentation suggests that this is the number of TMs (of which I
> wanted 7), and I thought an additional container would be used for the JM
> (my YARN cluster has 8 containers). Anyway with this setup the memory 
> added
> up to 56G and 1M (40G per TM and 16G 1M for the JM), but I set a hard
> maximum of 56G in my yarn-site.xml which is why the request could not be
> fulfilled. It is interesting to note that when I set
> both yarn.nodemanager.resource.memory-mb
> and yarn.scheduler.maximum-allocation-mb to 56G I get a proper error when
> requesting 56G and 1M, but when setting 
> yarn.nodemanager.resource.memory-mb
> to 56G and yarn.scheduler.maximum-allocation-mb to 54G I don't get an 
> error
> but the aforementioned endless loop. Note I
> have yarn.nodemanager.vmem-check-enabled set to false. This is probably a
> YARN issue then / my bad configuration.
>
> I'm in a rush now (to get to the Flink meetup) and thus will check the
> documentation later to see how to deploy the TMs and JM on separate
> machines each, since that is not what's happening at the moment, but this
> is what I'd like to have. Thanks again and see you in an hour.
>
> Cheers
> Robert

Re: OutOfMemoryError in netty local transport

2015-10-01 Thread Maximilian Michels
By the way, you might have to use the "-U" flag to force Maven to
update its dependencies:  mvn -U clean install -DskipTests

On Thu, Oct 1, 2015 at 10:19 AM, Robert Schmidtke
 wrote:
> Sweet! I'll pull it straight away. Thanks!
>
> On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels  wrote:
>>
>> Hi Robert,
>>
>> Just a quick update: The issue has been resolved in the latest Maven
>> 0.10-SNAPSHOT dependency.
>>
>> Cheers,
>> Max
>>
>> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
>>  wrote:
>> > Hi Max,
>> >
>> > thanks for your quick reply. I found the relevant code and commented it
>> > out
>> > for testing, seems to be working. Happily waiting for the fix. Thanks
>> > again.
>> >
>> > Robert
>> >
>> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels 
>> > wrote:
>> >>
>> >> Hi Robert,
>> >>
>> >> This is a regression on the current master due to changes in the way
>> >> Flink calculates the memory and sets the maximum direct memory size.
>> >> We introduced these changes when we merged support for off-heap
>> >> memory. This is not a problem in the way Flink deals with managed
>> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> >> maximum direct memory is only used by the network stack. The network
>> >> library we use, allocates more direct memory than we expected.
>> >>
>> >> We'll push a fix to the master as soon as possible. Thank you for
>> >> reporting and thanks for your patience.
>> >>
>> >> Best regards,
>> >> Max
>> >>
>> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>> >>  wrote:
>> >> > Hi everyone,
>> >> >
>> >> > I'm constantly running into OutOfMemoryErrors and for the life of me
>> >> > I
>> >> > cannot figure out what's wrong. Let me describe my setup. I'm running
>> >> > the
>> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
>> >> > unfinished implementation of TPC-H Q2
>> >> >
>> >> >
>> >> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of
>> >> > memory
>> >> > per
>> >> > machine. This is what I believe to be the relevant section of my
>> >> > yarn_site.xml:
>> >> >
>> >> >
>> >> > 
>> >> > yarn.nodemanager.resource.memory-mb
>> >> > 57344
>> >> >   
>> >> > 
>> >> >   
>> >> > yarn.scheduler.maximum-allocation-mb
>> >> > 55296
>> >> >   
>> >> >
>> >> >   
>> >> > yarn.nodemanager.vmem-check-enabled
>> >> > false
>> >> >   
>> >> >
>> >> >
>> >> > And this is how I submit the job:
>> >> >
>> >> >
>> >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7
>> >> > .
>> >> >
>> >> >
>> >> > The TMs happily report:
>> >> >
>> >> > .
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > -  JVM Options:
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > - -Xms24511m
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > - -Xmx24511m
>> >> > 11:50:15,577 INFO
>> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> > - -XX:MaxDirectMemorySize=65m
>> >> > .
>> >> >
>> >> >
>> >> > I've tried various combinations of YARN and Flink options, to no
>> >> > avail.
>> >> > I
>> >> > always end up with the following stacktrace:
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >> > java.lang.OutOfMemoryError: Direct buffer memory
>> >> > at
>> >> >
>> >> >
>> >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >> > at
>> >> >
>> >> >
>> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> > at
>> >> >
>> >> >
>> >> > 

Re: All but one TMs connect when JM has more than 16G of memory

2015-10-01 Thread Robert Schmidtke
Hi Robert,

I had a job failure yesterday with what I believe is the setup I have
described above. However when trying to reproduce now, the behavior is the
same: Flink waiting for resources to become available. So no hard error.

Ok, the looping makes sense then. I haven't thought about shared setups.
I'm still figuring out how all parameters play together, i.e. -yn, -yjm,
-ytm and the memory limits in yarn-site.xml. This will need some testing
and I'll come back with a proper description once I think I know what's
going on.

When running Flink on YARN, is it easily possible to place the Flink JM
where the YARN Resource Manager sits, and all the TMs with the remaining
Node Managers?

Robert

On Thu, Oct 1, 2015 at 10:53 AM, Robert Metzger  wrote:

> Hi,
>
> It is interesting to note that when I set both 
> yarn.nodemanager.resource.memory-mb
>> and yarn.scheduler.maximum-allocation-mb to 56G I get a proper error
>> when requesting 56G and 1M, but when setting 
>> yarn.nodemanager.resource.memory-mb
>> to 56G and yarn.scheduler.maximum-allocation-mb to 54G I don't get an
>> error but the aforementioned endless loop.
>
>
> is it a "hard error" (failing) you're getting or just "WARN" log messages.
> I'm asking because I've added some code some time ago to do some checks
> before deploying Flink on YARN. These checks will print WARN log messages
> if the requested YARN session/job does not fit onto the cluster.
> This "endless loop" exists because in many production environments Flink
> can just wait for resources to become available, for example when other
> containers are finishing.
>
>
> Robert
>
> On Wed, Sep 30, 2015 at 6:33 PM, Robert Schmidtke 
> wrote:
>
>> Hi Robert,
>>
>> thanks for your reply. It got me digging into my setup and I discovered
>> that one TM was scheduled next to the JM. When specifying -yn 7 the
>> documentation suggests that this is the number of TMs (of which I wanted
>> 7), and I thought an additional container would be used for the JM (my YARN
>> cluster has 8 containers). Anyway with this setup the memory added up to
>> 56G and 1M (40G per TM and 16G 1M for the JM), but I set a hard maximum of
>> 56G in my yarn-site.xml which is why the request could not be fulfilled. It
>> is interesting to note that when I set
>> both yarn.nodemanager.resource.memory-mb
>> and yarn.scheduler.maximum-allocation-mb to 56G I get a proper error when
>> requesting 56G and 1M, but when setting yarn.nodemanager.resource.memory-mb
>> to 56G and yarn.scheduler.maximum-allocation-mb to 54G I don't get an error
>> but the aforementioned endless loop. Note I
>> have yarn.nodemanager.vmem-check-enabled set to false. This is probably a
>> YARN issue then / my bad configuration.
>>
>> I'm in a rush now (to get to the Flink meetup) and thus will check the
>> documentation later to see how to deploy the TMs and JM on separate
>> machines each, since that is not what's happening at the moment, but this
>> is what I'd like to have. Thanks again and see you in an hour.
>>
>> Cheers
>> Robert
>>
>> On Wed, Sep 30, 2015 at 5:19 PM, Robert Metzger 
>> wrote:
>>
>>> Hi Robert,
>>>
>>> the problem here is that YARN's scheduler (there are different
>>> schedulers in YARN: FIFO, CapacityScheduler, ...) is not giving Flink's
>>> ApplicationMaster/JobManager all the containers it is requesting. By
>>> increasing the size of the AM/JM container, there is probably no memory
>>> left to fit the last TaskManager container.
>>> I also experienced this issue, when I wanted to run a Flink job on YARN
>>> and the containers were fitting theoretically, but YARN was not giving me
>>> all the containers I requested.
>>> Back then, I asked on the yarn-dev list [1] (there were also some
>>> off-list emails) but we could not resolve the issue.
>>>
>>> Can you check the resource manager logs? Maybe there is a log message
>>> which explains why the container request of Flink's AM is not fulfilled.
>>>
>>>
>>> [1]
>>> http://search-hadoop.com/m/AsBtCilK5r1pKLjf1=Re+QUESTION+Allocating+a+full+YARN+cluster
>>>
>>> On Wed, Sep 30, 2015 at 5:02 PM, Robert Schmidtke <
>>> ro.schmid...@gmail.com> wrote:
>>>
 It's me again. This is a strange issue, I hope I managed to find the
 right keywords. I got 8 machines, 1 for the JM, the other 7 are TMs with
 64G of memory each.

 When running my job like so:

 $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 40960 -yn 7
 .

 The job completes without any problems. When running it like so:

 $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16385 -ytm 40960 -yn 7
 .

 (note the one more M of memory for the JM), the execution stalls,
 continuously reporting:

 .
 TaskManager status (6/7)
 TaskManager status (6/7)
 TaskManager status (6/7)
 .

 I did some poking around, but I couldn't find any direct correlation
 with 

Re: OutOfMemoryError in netty local transport

2015-10-01 Thread Maximilian Michels
Great to hear :)

On Thu, Oct 1, 2015 at 11:21 AM, Robert Schmidtke
 wrote:
> I pulled the current master branch and rebuilt Flink completely anyway.
> Works like a charm.
>
> On Thu, Oct 1, 2015 at 11:11 AM, Maximilian Michels  wrote:
>>
>> By the way, you might have to use the "-U" flag to force Maven to
>> update its dependencies:  mvn -U clean install -DskipTests
>>
>> On Thu, Oct 1, 2015 at 10:19 AM, Robert Schmidtke
>>  wrote:
>> > Sweet! I'll pull it straight away. Thanks!
>> >
>> > On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels 
>> > wrote:
>> >>
>> >> Hi Robert,
>> >>
>> >> Just a quick update: The issue has been resolved in the latest Maven
>> >> 0.10-SNAPSHOT dependency.
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke
>> >>  wrote:
>> >> > Hi Max,
>> >> >
>> >> > thanks for your quick reply. I found the relevant code and commented
>> >> > it
>> >> > out
>> >> > for testing, seems to be working. Happily waiting for the fix. Thanks
>> >> > again.
>> >> >
>> >> > Robert
>> >> >
>> >> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels 
>> >> > wrote:
>> >> >>
>> >> >> Hi Robert,
>> >> >>
>> >> >> This is a regression on the current master due to changes in the way
>> >> >> Flink calculates the memory and sets the maximum direct memory size.
>> >> >> We introduced these changes when we merged support for off-heap
>> >> >> memory. This is not a problem in the way Flink deals with managed
>> >> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the
>> >> >> maximum direct memory is only used by the network stack. The network
>> >> >> library we use, allocates more direct memory than we expected.
>> >> >>
>> >> >> We'll push a fix to the master as soon as possible. Thank you for
>> >> >> reporting and thanks for your patience.
>> >> >>
>> >> >> Best regards,
>> >> >> Max
>> >> >>
>> >> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
>> >> >>  wrote:
>> >> >> > Hi everyone,
>> >> >> >
>> >> >> > I'm constantly running into OutOfMemoryErrors and for the life of
>> >> >> > me
>> >> >> > I
>> >> >> > cannot figure out what's wrong. Let me describe my setup. I'm
>> >> >> > running
>> >> >> > the
>> >> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is
>> >> >> > an
>> >> >> > unfinished implementation of TPC-H Q2
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
>> >> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of
>> >> >> > memory
>> >> >> > per
>> >> >> > machine. This is what I believe to be the relevant section of my
>> >> >> > yarn_site.xml:
>> >> >> >
>> >> >> >
>> >> >> > 
>> >> >> > yarn.nodemanager.resource.memory-mb
>> >> >> > 57344
>> >> >> >   
>> >> >> > 
>> >> >> >   
>> >> >> > yarn.scheduler.maximum-allocation-mb
>> >> >> > 55296
>> >> >> >   
>> >> >> >
>> >> >> >   
>> >> >> > yarn.nodemanager.vmem-check-enabled
>> >> >> > false
>> >> >> >   
>> >> >> >
>> >> >> >
>> >> >> > And this is how I submit the job:
>> >> >> >
>> >> >> >
>> >> >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768
>> >> >> > -yn 7
>> >> >> > .
>> >> >> >
>> >> >> >
>> >> >> > The TMs happily report:
>> >> >> >
>> >> >> > .
>> >> >> > 11:50:15,577 INFO
>> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> >> > -  JVM Options:
>> >> >> > 11:50:15,577 INFO
>> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> >> > - -Xms24511m
>> >> >> > 11:50:15,577 INFO
>> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> >> > - -Xmx24511m
>> >> >> > 11:50:15,577 INFO
>> >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
>> >> >> > - -XX:MaxDirectMemorySize=65m
>> >> >> > .
>> >> >> >
>> >> >> >
>> >> >> > I've tried various combinations of YARN and Flink options, to no
>> >> >> > avail.
>> >> >> > I
>> >> >> > always end up with the following stacktrace:
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >> >> > java.lang.OutOfMemoryError: Direct buffer memory
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> >> > at
>> >> >> >
>> >> >> >
>> >> >> 

Re: All but one TMs connect when JM has more than 16G of memory

2015-10-01 Thread Robert Metzger
Hi,
there is currently no option for forcing certain containers onto specific
machines.
For running the JM (or any other YARN container) on the AM host, you first
need to have a NodeManager running on the host with the RM. Maybe YARN is
smart enough to schedule the small JM container onto that machine.

I don't know your exact setup, but maybe it would make sense for you to run
Flink in the standalone cluster mode instead with YARN. It seems that you
have a very good idea how and where you want to run the Flink services in
your cluster. YARN is designed to be an abstraction between the cluster and
the application, that's why its a bit difficult to schedule the containers
to specific machines.

Robert



On Thu, Oct 1, 2015 at 11:24 AM, Robert Schmidtke 
wrote:

> Hi Robert,
>
> I had a job failure yesterday with what I believe is the setup I have
> described above. However when trying to reproduce now, the behavior is the
> same: Flink waiting for resources to become available. So no hard error.
>
> Ok, the looping makes sense then. I haven't thought about shared setups.
> I'm still figuring out how all parameters play together, i.e. -yn, -yjm,
> -ytm and the memory limits in yarn-site.xml. This will need some testing
> and I'll come back with a proper description once I think I know what's
> going on.
>
> When running Flink on YARN, is it easily possible to place the Flink JM
> where the YARN Resource Manager sits, and all the TMs with the remaining
> Node Managers?
>
> Robert
>
> On Thu, Oct 1, 2015 at 10:53 AM, Robert Metzger 
> wrote:
>
>> Hi,
>>
>> It is interesting to note that when I set both 
>> yarn.nodemanager.resource.memory-mb
>>> and yarn.scheduler.maximum-allocation-mb to 56G I get a proper error
>>> when requesting 56G and 1M, but when setting 
>>> yarn.nodemanager.resource.memory-mb
>>> to 56G and yarn.scheduler.maximum-allocation-mb to 54G I don't get an
>>> error but the aforementioned endless loop.
>>
>>
>> is it a "hard error" (failing) you're getting or just "WARN" log
>> messages. I'm asking because I've added some code some time ago to do some
>> checks before deploying Flink on YARN. These checks will print WARN log
>> messages if the requested YARN session/job does not fit onto the cluster.
>> This "endless loop" exists because in many production environments Flink
>> can just wait for resources to become available, for example when other
>> containers are finishing.
>>
>>
>> Robert
>>
>> On Wed, Sep 30, 2015 at 6:33 PM, Robert Schmidtke > > wrote:
>>
>>> Hi Robert,
>>>
>>> thanks for your reply. It got me digging into my setup and I discovered
>>> that one TM was scheduled next to the JM. When specifying -yn 7 the
>>> documentation suggests that this is the number of TMs (of which I wanted
>>> 7), and I thought an additional container would be used for the JM (my YARN
>>> cluster has 8 containers). Anyway with this setup the memory added up to
>>> 56G and 1M (40G per TM and 16G 1M for the JM), but I set a hard maximum of
>>> 56G in my yarn-site.xml which is why the request could not be fulfilled. It
>>> is interesting to note that when I set
>>> both yarn.nodemanager.resource.memory-mb
>>> and yarn.scheduler.maximum-allocation-mb to 56G I get a proper error when
>>> requesting 56G and 1M, but when setting yarn.nodemanager.resource.memory-mb
>>> to 56G and yarn.scheduler.maximum-allocation-mb to 54G I don't get an error
>>> but the aforementioned endless loop. Note I
>>> have yarn.nodemanager.vmem-check-enabled set to false. This is probably a
>>> YARN issue then / my bad configuration.
>>>
>>> I'm in a rush now (to get to the Flink meetup) and thus will check the
>>> documentation later to see how to deploy the TMs and JM on separate
>>> machines each, since that is not what's happening at the moment, but this
>>> is what I'd like to have. Thanks again and see you in an hour.
>>>
>>> Cheers
>>> Robert
>>>
>>> On Wed, Sep 30, 2015 at 5:19 PM, Robert Metzger 
>>> wrote:
>>>
 Hi Robert,

 the problem here is that YARN's scheduler (there are different
 schedulers in YARN: FIFO, CapacityScheduler, ...) is not giving Flink's
 ApplicationMaster/JobManager all the containers it is requesting. By
 increasing the size of the AM/JM container, there is probably no memory
 left to fit the last TaskManager container.
 I also experienced this issue, when I wanted to run a Flink job on YARN
 and the containers were fitting theoretically, but YARN was not giving me
 all the containers I requested.
 Back then, I asked on the yarn-dev list [1] (there were also some
 off-list emails) but we could not resolve the issue.

 Can you check the resource manager logs? Maybe there is a log message
 which explains why the container request of Flink's AM is not fulfilled.


 [1]