I don't have a build unfortunately, I'm using the maven dependency. I'll try to find a workaround. Thx for your help.

-s

On 20.02.2015 12:44, Robert Metzger wrote:
Hey Sebastian,

I've fixed the issue in this branch:
https://github.com/rmetzger/flink/tree/flink1589:

Configuration c =newConfiguration();
c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,0.5f);
finalExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);


I'll also backport the fix to the release-0.8 branch to make it
available in the 0.8.2 release.

Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build.


Best,
Robert

On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rmetz...@apache.org
<mailto:rmetz...@apache.org>> wrote:

    Hi Sebastian,

    Looks like you've found a limitation of Flink.
    I've already filed two JIRAs to resolve the issue
    (https://issues.apache.org/jira/browse/FLINK-1588,
    https://issues.apache.org/jira/browse/FLINK-1589).

    I don't know your setup, when you use Flink just as a dependency
    without a version being checked out, there is probably no way right
    now to use change the configuration settings.
    Then, you have to start yourself a local cluster
    (./bin/start-local.sh (+ your settings in conf/flink-conf.yaml)).
    You can then either submit your job with ./bin/flink or using the
    RemoteExecutionEnvironment (ExecutionEnvironment.createRemoteEnvironment()).

    If you have the Flink source checked out, you can also hard-code the
    configuration values into org.apache.flink.client.LocalExecutor.


    By the way, Flink 0.8.1 is now available on maven central (I suspect
    you had to build it yourself yesterday evening).
    But given these issues here, it doesn't matter for you anymore ;)


    Best,
    Robert



    On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ssc.o...@googlemail.com
    <mailto:ssc.o...@googlemail.com>> wrote:

        I'm running flink from my IDE, how do change this setting in
        that context?


        On 20.02.2015 11:41, Fabian Hueske wrote:

            Have you tried to increase the heap size by shrinking the
            TM-managed memory?

            Reduce the fraction (taskmanager.memory.fraction) or fix the
            amount of TM memory (taskmanager.memory.size) in the
            flink-config.yaml [1].

            Cheers, Fabian

            [1] http://flink.apache.org/docs/__0.8/config.html
            <http://flink.apache.org/docs/0.8/config.html>


                On 20 Feb 2015, at 11:30, Sebastian
                <ssc.o...@googlemail.com
                <mailto:ssc.o...@googlemail.com>> wrote:

                Hi,

                I get a strange out of memory error from the
                serialization code when I try to run the following program:

                def compute(trackingGraphFile: String, domainIndexFile:
                String,
                   outputPath: String) = {

                implicit val env =
                ExecutionEnvironment.__getExecutionEnvironment

                val edges = GraphUtils.readEdges(__trackingGraphFile)
                val domains = GraphUtils.readVertices(__domainIndexFile)

                val domainsByCompany = DomainsByCompany.mapping
                val companyEdges = edges.filter { edge =>
                     domainsByCompany.contains(__edge.src.toInt) }
                   .map { edge => domainsByCompany(edge.src.__toInt) ->
                edge.target.toInt }
                   .distinct

                val companyBitMaps = companyEdges.groupBy(0).__reduceGroup {
                     domainsByCompany: Iterator[(String,Int)] =>

                     var company = ""
                     val seenAt = new util.BitSet(42889800)

                     for ((name, domain) <- domainsByCompany) {
                       company = name
                       seenAt.set(domain)
                     }

                     company -> seenAt
                   }

                   companyBitMaps.print()

                   env.execute()

                }


                The error looks as follows:


                2015-02-20 11:22:54 INFO  JobClient:345 -
                java.lang.OutOfMemoryError: Java heap space
                         at org.apache.flink.runtime.io
                
<http://org.apache.flink.runtime.io>.__network.serialization.__DataOutputSerializer.resize(__DataOutputSerializer.java:249)
                         at org.apache.flink.runtime.io
                
<http://org.apache.flink.runtime.io>.__network.serialization.__DataOutputSerializer.write(__DataOutputSerializer.java:93)
                         at
                
org.apache.flink.api.java.__typeutils.runtime.__DataOutputViewStream.write(__DataOutputViewStream.java:39)
                         at com.esotericsoftware.kryo.io
                
<http://com.esotericsoftware.kryo.io>.__Output.flush(Output.java:163)
                         at com.esotericsoftware.kryo.io
                
<http://com.esotericsoftware.kryo.io>.__Output.require(Output.java:__142)
                         at com.esotericsoftware.kryo.io
                
<http://com.esotericsoftware.kryo.io>.__Output.writeBoolean(Output.__java:613)
                         at
                
com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:42)
                         at
                
com.twitter.chill.java.__BitSetSerializer.write(__BitSetSerializer.java:29)
                         at
                
com.esotericsoftware.kryo.__Kryo.writeClassAndObject(Kryo.__java:599)
                         at
                
org.apache.flink.api.java.__typeutils.runtime.__KryoSerializer.serialize(__KryoSerializer.java:155)
                         at
                
org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:91)
                         at
                
org.apache.flink.api.scala.__typeutils.CaseClassSerializer.__serialize(CaseClassSerializer.__scala:30)
                         at
                
org.apache.flink.runtime.__plugable.__SerializationDelegate.write(__SerializationDelegate.java:51)
                         at org.apache.flink.runtime.io
                
<http://org.apache.flink.runtime.io>.__network.serialization.__SpanningRecordSerializer.__addRecord(__SpanningRecordSerializer.java:__76)
                         at org.apache.flink.runtime.io
                
<http://org.apache.flink.runtime.io>.__network.api.RecordWriter.emit(__RecordWriter.java:82)
                         at
                
org.apache.flink.runtime.__operators.shipping.__OutputCollector.collect(__OutputCollector.java:88)
                         at
                
org.apache.flink.api.scala.__GroupedDataSet$$anon$2.reduce(__GroupedDataSet.scala:262)
                         at
                
org.apache.flink.runtime.__operators.GroupReduceDriver.__run(GroupReduceDriver.java:__124)
                         at
                
org.apache.flink.runtime.__operators.RegularPactTask.run(__RegularPactTask.java:493)
                         at
                
org.apache.flink.runtime.__operators.RegularPactTask.__invoke(RegularPactTask.java:__360)
                         at
                
org.apache.flink.runtime.__execution.RuntimeEnvironment.__run(RuntimeEnvironment.java:__257)
                         at java.lang.Thread.run(Thread.__java:745)

                I run the job locally, giving 2GB of Ram to the VM. The
                code will produce less than 10 groups and the bitsets
                used internally should not be larger than a few megabytes.

                Any tips on how to fix this?

                Best,
                Sebastian

                PS: Still waiting for a reduceGroup that gives me the key ;)






Reply via email to