Re: Memory segment error
Hmm, that is really weird. Can you point me to a branch in your repository and the test case that gives the error? Then I have a look at it and try to figure out what's going wrong. Cheers, Fabian 2015-03-30 10:43 GMT+02:00 Andra Lungu lungu.an...@gmail.com: Hello, I went on and did some further debugging on this issue. Even though the exception said that the problem comes from here: 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR org.apache.flink.runtime.operators.RegularPactTask - Error in task code: Join(Join at weighEdges(NodeSplitting.java:117)) (1/4) java.lang.Exception: The data preparation for task 'Join(Join at weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory segments provided. Hash Join needs at least 33 memory segments. at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) which is basically a chain of two joins, schema that I have repeated several times, including in the getTriplets() method and it passed every time. I thought that this could not be right! So I picked each intermediate data set formed, printed it and added a System.exit(0) afterwards. The exception comes from this method: aggregatePartialValuesSplitVertices. Even though this computes the correct result, it then throws the memory segment exception(!! Just for the Cluster test - everything else works). The code in the function is: private static DataSetVertexString, Long aggregatePartialValuesSplitVertices(DataSetVertexString, Long resultedVertices) { return resultedVertices.flatMap(new FlatMapFunctionVertexString, Long, VertexString, Long() { @Override public void flatMap(VertexString, Long vertex, CollectorVertexString, Long collector) throws Exception { int pos = vertex.getId().indexOf(_); // if there is a splitted vertex if(pos -1) { collector.collect(new VertexString, Long(vertex.getId().substring(0, pos), vertex.getValue())); } else { collector.collect(vertex); } } }).groupBy(0).reduceGroup(new GroupReduceFunctionVertexString, Long, VertexString, Long() { @Override public void reduce(IterableVertexString, Long iterable, CollectorVertexString, Long collector) throws Exception { long sum = 0; VertexString, Long vertex = new VertexString, Long(); IteratorVertexString, Long iterator = iterable.iterator(); while (iterator.hasNext()) { vertex = iterator.next(); sum += vertex.getValue(); } collector.collect(new VertexString, Long(vertex.getId(), sum)); } }); To me, nothing seems out of the ordinary here. This is regular user code. And the behaviour in the end is definitely not the one expected. Any idea why this might be happening? Thanks! Andra On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu lungu.an...@gmail.com wrote: Opps! Sorry! Did not know the mailing list does not support attachments :) https://gist.github.com/andralungu/fba36d77f79189daa183 On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu lungu.an...@gmail.com wrote: Hi Fabian, I uploaded a file with my execution plan. On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske fhue...@gmail.com wrote: Hi Andra, the error is independent of the size of the data set. A HashTable needs at least 33 memory pages to operate. Since you have 820MB of managed memory and the size of a memory page is 32KB, there should be more than 25k pages available. Can you post the execution plan of the program you execute ( ExecutionEnvironment.getExecutionPlan() )? Best, Fabian 2015-03-26 23:31 GMT+01:00 Andra Lungu lungu.an...@gmail.com: For 20 edges and 5 nodes, that should be more thank enough. On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu lungu.an...@gmail.com wrote: Sure, 3470 [main] INFO org.apache.flink.runtime.taskmanager.TaskManager - Using 820 MB for Flink managed memory. On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger rmetz...@apache.org wrote: Hi, during startup, Flink will log something like: 16:48:09,669 INFO org.apache.flink.runtime.taskmanager.TaskManager - Using 1193 MB for Flink managed memory. Can you tell us how much memory Flink is managing in your case? On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu lungu.an...@gmail.com wrote: Hello everyone, I guess I need to revive this old discussion:
Re: Memory segment error
Hello, I went on and did some further debugging on this issue. Even though the exception said that the problem comes from here: 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR org.apache.flink.runtime.operators.RegularPactTask - Error in task code: Join(Join at weighEdges(NodeSplitting.java:117)) (1/4) java.lang.Exception: The data preparation for task 'Join(Join at weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory segments provided. Hash Join needs at least 33 memory segments. at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) which is basically a chain of two joins, schema that I have repeated several times, including in the getTriplets() method and it passed every time. I thought that this could not be right! So I picked each intermediate data set formed, printed it and added a System.exit(0) afterwards. The exception comes from this method: aggregatePartialValuesSplitVertices. Even though this computes the correct result, it then throws the memory segment exception(!! Just for the Cluster test - everything else works). The code in the function is: private static DataSetVertexString, Long aggregatePartialValuesSplitVertices(DataSetVertexString, Long resultedVertices) { return resultedVertices.flatMap(new FlatMapFunctionVertexString, Long, VertexString, Long() { @Override public void flatMap(VertexString, Long vertex, CollectorVertexString, Long collector) throws Exception { int pos = vertex.getId().indexOf(_); // if there is a splitted vertex if(pos -1) { collector.collect(new VertexString, Long(vertex.getId().substring(0, pos), vertex.getValue())); } else { collector.collect(vertex); } } }).groupBy(0).reduceGroup(new GroupReduceFunctionVertexString, Long, VertexString, Long() { @Override public void reduce(IterableVertexString, Long iterable, CollectorVertexString, Long collector) throws Exception { long sum = 0; VertexString, Long vertex = new VertexString, Long(); IteratorVertexString, Long iterator = iterable.iterator(); while (iterator.hasNext()) { vertex = iterator.next(); sum += vertex.getValue(); } collector.collect(new VertexString, Long(vertex.getId(), sum)); } }); To me, nothing seems out of the ordinary here. This is regular user code. And the behaviour in the end is definitely not the one expected. Any idea why this might be happening? Thanks! Andra On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu lungu.an...@gmail.com wrote: Opps! Sorry! Did not know the mailing list does not support attachments :) https://gist.github.com/andralungu/fba36d77f79189daa183 On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu lungu.an...@gmail.com wrote: Hi Fabian, I uploaded a file with my execution plan. On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske fhue...@gmail.com wrote: Hi Andra, the error is independent of the size of the data set. A HashTable needs at least 33 memory pages to operate. Since you have 820MB of managed memory and the size of a memory page is 32KB, there should be more than 25k pages available. Can you post the execution plan of the program you execute ( ExecutionEnvironment.getExecutionPlan() )? Best, Fabian 2015-03-26 23:31 GMT+01:00 Andra Lungu lungu.an...@gmail.com: For 20 edges and 5 nodes, that should be more thank enough. On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu lungu.an...@gmail.com wrote: Sure, 3470 [main] INFO org.apache.flink.runtime.taskmanager.TaskManager - Using 820 MB for Flink managed memory. On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger rmetz...@apache.org wrote: Hi, during startup, Flink will log something like: 16:48:09,669 INFO org.apache.flink.runtime.taskmanager.TaskManager - Using 1193 MB for Flink managed memory. Can you tell us how much memory Flink is managing in your case? On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu lungu.an...@gmail.com wrote: Hello everyone, I guess I need to revive this old discussion: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html At that point, the fix was to kindly ask Alex to make his project work with 0.9. Now, I am not that lucky! This is the code: https://github.com/andralungu/gelly-partitioning/tree/alphaSplit The main program(NodeSplitting) is working nicely, I get the correct result. But if you run the
Re: Memory segment error
Sure, It was in the first mail but that was sent a while ago :) This is the code: https://github.com/andralungu/gelly-partitioning/tree/alphaSplit I also added the log4j file in case it helps! The error is totally reproducible. 2 out of 2 people got the same. Steps to reproduce: 1). Clone the code; switch to alphaSplit branch 2). Run CounDegreeITCase.java Hope we can get to the bottom of this! If you need something, just ask. On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske fhue...@gmail.com wrote: Hmm, that is really weird. Can you point me to a branch in your repository and the test case that gives the error? Then I have a look at it and try to figure out what's going wrong. Cheers, Fabian 2015-03-30 10:43 GMT+02:00 Andra Lungu lungu.an...@gmail.com: Hello, I went on and did some further debugging on this issue. Even though the exception said that the problem comes from here: 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR org.apache.flink.runtime.operators.RegularPactTask - Error in task code: Join(Join at weighEdges(NodeSplitting.java:117)) (1/4) java.lang.Exception: The data preparation for task 'Join(Join at weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory segments provided. Hash Join needs at least 33 memory segments. at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) which is basically a chain of two joins, schema that I have repeated several times, including in the getTriplets() method and it passed every time. I thought that this could not be right! So I picked each intermediate data set formed, printed it and added a System.exit(0) afterwards. The exception comes from this method: aggregatePartialValuesSplitVertices. Even though this computes the correct result, it then throws the memory segment exception(!! Just for the Cluster test - everything else works). The code in the function is: private static DataSetVertexString, Long aggregatePartialValuesSplitVertices(DataSetVertexString, Long resultedVertices) { return resultedVertices.flatMap(new FlatMapFunctionVertexString, Long, VertexString, Long() { @Override public void flatMap(VertexString, Long vertex, CollectorVertexString, Long collector) throws Exception { int pos = vertex.getId().indexOf(_); // if there is a splitted vertex if(pos -1) { collector.collect(new VertexString, Long(vertex.getId().substring(0, pos), vertex.getValue())); } else { collector.collect(vertex); } } }).groupBy(0).reduceGroup(new GroupReduceFunctionVertexString, Long, VertexString, Long() { @Override public void reduce(IterableVertexString, Long iterable, CollectorVertexString, Long collector) throws Exception { long sum = 0; VertexString, Long vertex = new VertexString, Long(); IteratorVertexString, Long iterator = iterable.iterator(); while (iterator.hasNext()) { vertex = iterator.next(); sum += vertex.getValue(); } collector.collect(new VertexString, Long(vertex.getId(), sum)); } }); To me, nothing seems out of the ordinary here. This is regular user code. And the behaviour in the end is definitely not the one expected. Any idea why this might be happening? Thanks! Andra On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu lungu.an...@gmail.com wrote: Opps! Sorry! Did not know the mailing list does not support attachments :) https://gist.github.com/andralungu/fba36d77f79189daa183 On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu lungu.an...@gmail.com wrote: Hi Fabian, I uploaded a file with my execution plan. On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske fhue...@gmail.com wrote: Hi Andra, the error is independent of the size of the data set. A HashTable needs at least 33 memory pages to operate. Since you have 820MB of managed memory and the size of a memory page is 32KB, there should be more than 25k pages available. Can you post the execution plan of the program you execute ( ExecutionEnvironment.getExecutionPlan() )? Best, Fabian 2015-03-26 23:31 GMT+01:00 Andra Lungu lungu.an...@gmail.com: For 20 edges and 5 nodes, that should be more thank enough. On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu lungu.an...@gmail.com wrote: Sure, 3470 [main] INFO org.apache.flink.runtime.taskmanager.TaskManager - Using 820 MB for Flink managed memory.
[jira] [Created] (FLINK-1804) flink-quickstart-scala tests fail on scala-2.11 build profile on travis
Robert Metzger created FLINK-1804: - Summary: flink-quickstart-scala tests fail on scala-2.11 build profile on travis Key: FLINK-1804 URL: https://issues.apache.org/jira/browse/FLINK-1804 Project: Flink Issue Type: Task Components: Build System, Quickstarts Affects Versions: 0.9 Reporter: Robert Metzger Travis builds on master started failing after the Scala 2.11 profile has been added to Flink. For example: https://travis-ci.org/apache/flink/jobs/56312734 The error: {code} [INFO] [INFO] --- scala-maven-plugin:3.1.4:compile (default) @ testArtifact --- [INFO] [INFO] artifact joda-time:joda-time: checking for updates from sonatype [INFO] [INFO] artifact joda-time:joda-time: checking for updates from sonatype-apache [INFO] [INFO] artifact joda-time:joda-time: checking for updates from sonatype [INFO] [INFO] artifact joda-time:joda-time: checking for updates from sonatype-apache [INFO] [WARNING] Expected all dependencies to require Scala version: 2.10.4 [INFO] [WARNING] com.twitter:chill_2.10:0.5.2 requires scala version: 2.10.4 [INFO] [WARNING] com.twitter:chill-avro_2.10:0.5.2 requires scala version: 2.10.4 [INFO] [WARNING] com.twitter:chill-bijection_2.10:0.5.2 requires scala version: 2.10.4 [INFO] [WARNING] com.twitter:bijection-core_2.10:0.7.2 requires scala version: 2.10.4 [INFO] [WARNING] com.twitter:bijection-avro_2.10:0.7.2 requires scala version: 2.10.4 [INFO] [WARNING] org.scala-lang:scala-reflect:2.10.4 requires scala version: 2.10.4 [INFO] [WARNING] org.apache.flink:flink-scala:0.9-SNAPSHOT requires scala version: 2.10.4 [INFO] [WARNING] org.apache.flink:flink-scala:0.9-SNAPSHOT requires scala version: 2.10.4 [INFO] [WARNING] org.scala-lang:scala-compiler:2.10.4 requires scala version: 2.10.4 [INFO] [WARNING] org.scalamacros:quasiquotes_2.10:2.0.1 requires scala version: 2.10.4 [INFO] [WARNING] org.apache.flink:flink-streaming-scala:0.9-SNAPSHOT requires scala version: 2.11.4 [INFO] [WARNING] Multiple versions of scala libraries detected! [INFO] [INFO] /home/travis/build/apache/flink/flink-quickstart/flink-quickstart-scala/target/test-classes/projects/testArtifact/project/testArtifact/src/main/scala:-1: info: compiling [INFO] [INFO] Compiling 3 source files to /home/travis/build/apache/flink/flink-quickstart/flink-quickstart-scala/target/test-classes/projects/testArtifact/project/testArtifact/target/classes at 1427650524446 [INFO] [ERROR] error: [INFO] [INFO] while compiling: /home/travis/build/apache/flink/flink-quickstart/flink-quickstart-scala/target/test-classes/projects/testArtifact/project/testArtifact/src/main/scala/org/apache/flink/archetypetest/SocketTextStreamWordCount.scala [INFO] [INFO] during phase: typer [INFO] [INFO] library version: version 2.10.4 [INFO] [INFO] compiler version: version 2.10.4 [INFO] [INFO] reconstructed args: -d /home/travis/build/apache/flink/flink-quickstart/flink-quickstart-scala/target/test-classes/projects/testArtifact/project/testArtifact/target/classes -classpath
[jira] [Created] (FLINK-1800) Add a Beta badge in the documentation to components in flink-staging
Robert Metzger created FLINK-1800: - Summary: Add a Beta badge in the documentation to components in flink-staging Key: FLINK-1800 URL: https://issues.apache.org/jira/browse/FLINK-1800 Project: Flink Issue Type: Task Reporter: Robert Metzger Priority: Minor As per mailing list discussion: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-a-quot-Beta-quot-badge-in-the-documentation-to-components-in-flink-staging-td4801.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Memory segment error
Hi Andra, I found the cause for the exception. Your test case is simply too complex for our testing environment. We restrict the TM memory for testcases to 80MB in order to execute multiple tests in parallel on Travis. I counted the memory consumers in your job and got: - 2 Combine - 4 GroupReduce - 4 CoGroup - 2 Joins - 1 SolutionSet Those are quite a few memory consumers for 20MB per slot (4 slots per TM). Do you see a way to reduce the number of operators in your testcase, maybe by splitting it in half? 2015-03-30 11:01 GMT+02:00 Andra Lungu lungu.an...@gmail.com: Sure, It was in the first mail but that was sent a while ago :) This is the code: https://github.com/andralungu/gelly-partitioning/tree/alphaSplit I also added the log4j file in case it helps! The error is totally reproducible. 2 out of 2 people got the same. Steps to reproduce: 1). Clone the code; switch to alphaSplit branch 2). Run CounDegreeITCase.java Hope we can get to the bottom of this! If you need something, just ask. On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske fhue...@gmail.com wrote: Hmm, that is really weird. Can you point me to a branch in your repository and the test case that gives the error? Then I have a look at it and try to figure out what's going wrong. Cheers, Fabian 2015-03-30 10:43 GMT+02:00 Andra Lungu lungu.an...@gmail.com: Hello, I went on and did some further debugging on this issue. Even though the exception said that the problem comes from here: 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR org.apache.flink.runtime.operators.RegularPactTask - Error in task code: Join(Join at weighEdges(NodeSplitting.java:117)) (1/4) java.lang.Exception: The data preparation for task 'Join(Join at weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory segments provided. Hash Join needs at least 33 memory segments. at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) which is basically a chain of two joins, schema that I have repeated several times, including in the getTriplets() method and it passed every time. I thought that this could not be right! So I picked each intermediate data set formed, printed it and added a System.exit(0) afterwards. The exception comes from this method: aggregatePartialValuesSplitVertices. Even though this computes the correct result, it then throws the memory segment exception(!! Just for the Cluster test - everything else works). The code in the function is: private static DataSetVertexString, Long aggregatePartialValuesSplitVertices(DataSetVertexString, Long resultedVertices) { return resultedVertices.flatMap(new FlatMapFunctionVertexString, Long, VertexString, Long() { @Override public void flatMap(VertexString, Long vertex, CollectorVertexString, Long collector) throws Exception { int pos = vertex.getId().indexOf(_); // if there is a splitted vertex if(pos -1) { collector.collect(new VertexString, Long(vertex.getId().substring(0, pos), vertex.getValue())); } else { collector.collect(vertex); } } }).groupBy(0).reduceGroup(new GroupReduceFunctionVertexString, Long, VertexString, Long() { @Override public void reduce(IterableVertexString, Long iterable, CollectorVertexString, Long collector) throws Exception { long sum = 0; VertexString, Long vertex = new VertexString, Long(); IteratorVertexString, Long iterator = iterable.iterator(); while (iterator.hasNext()) { vertex = iterator.next(); sum += vertex.getValue(); } collector.collect(new VertexString, Long(vertex.getId(), sum)); } }); To me, nothing seems out of the ordinary here. This is regular user code. And the behaviour in the end is definitely not the one expected. Any idea why this might be happening? Thanks! Andra On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu lungu.an...@gmail.com wrote: Opps! Sorry! Did not know the mailing list does not support attachments :) https://gist.github.com/andralungu/fba36d77f79189daa183 On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu lungu.an...@gmail.com wrote: Hi Fabian, I uploaded a file with my execution plan. On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske fhue...@gmail.com wrote: Hi Andra, the error is independent of
[jira] [Created] (FLINK-1801) NetworkEnvironment should start without JobManager association
Stephan Ewen created FLINK-1801: --- Summary: NetworkEnvironment should start without JobManager association Key: FLINK-1801 URL: https://issues.apache.org/jira/browse/FLINK-1801 Project: Flink Issue Type: Sub-task Components: TaskManager Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 0.9 The NetworkEnvironment should be able to start without a dedicated JobManager association and get one / loose one as the TaskManager connects to different JobManagers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: 答复: [VOTE] Name of Expression API Representation
Great :) On Sun, Mar 29, 2015 at 7:49 PM, Henry Saputra henry.sapu...@gmail.com wrote: Thanks for driving the resolution, Aljoscha On Sun, Mar 29, 2015 at 3:26 AM, Aljoscha Krettek aljos...@apache.org wrote: I hereby close the vote. Thanks for all your votes! We have 15 votes: +Relation: 4 +DataTable: 3 +Table: 8 So I will rename ExpressionOperation to Table and finally merge the pull request with the rename and Java support. On Thu, Mar 26, 2015 at 12:46 PM, Matadorhong hongsi...@huawei.com wrote: +Table 发件人: aalexandrov [via Apache Flink (Incubator) Mailing List archive.] [mailto:ml-node+s1008284n4743...@n3.nabble.com] 发送时间: 2015年3月26日 19:40 收件人: Hongsibao 主题: Re: [VOTE] Name of Expression API Representation +Table 2015-03-26 10:28 GMT+01:00 Robert Metzger [hidden email]/user/SendEmail.jtp?type=nodenode=4743i=0: +Table On Thu, Mar 26, 2015 at 10:13 AM, Aljoscha Krettek [hidden email]/user/SendEmail.jtp?type=nodenode=4743i=1 wrote: Thanks Henry. :D +Relation On Thu, Mar 26, 2015 at 9:36 AM, Till Rohrmann [hidden email]/user/SendEmail.jtp?type=nodenode=4743i=2 wrote: +Table On Thu, Mar 26, 2015 at 9:32 AM, Márton Balassi [hidden email]/user/SendEmail.jtp?type=nodenode=4743i=3 wrote: +DataTable On Thu, Mar 26, 2015 at 9:29 AM, Markl, Volker, Prof. Dr. [hidden email]/user/SendEmail.jtp?type=nodenode=4743i=4 wrote: +Table I also agree with that line of argument (think SQL ;-) ) -Ursprüngliche Nachricht- Von: Timo Walther [mailto:[hidden email]/user/SendEmail.jtp?type=nodenode=4743i=5] Gesendet: Donnerstag, 26. März 2015 09:28 An: [hidden email]/user/SendEmail.jtp?type=nodenode=4743i=6 Betreff: Re: [VOTE] Name of Expression API Representation +Table API Same thoughts as Stephan. Table is more common in the economy than Relation. On 25.03.2015 21:30, Stephan Ewen wrote: +Table API / Table I have a feeling that Relation is a name mostly used by people with a deeper background in (relational) databases, while table is more the pragmatic developer term. (As a reason for my choice) Am 25.03.2015 20:37 schrieb Fabian Hueske [hidden email]/user/SendEmail.jtp?type=nodenode=4743i=7: I think the voting scheme is clear. The mail that started the thread says: The name with the most votes is chosen. If the vote ends with no name having the most votes, a new vote with an alternative voting scheme will be done. So let's go with a single vote and handle corner cases as they appear. 2015-03-25 20:24 GMT+01:00 Ufuk Celebi [hidden email]/user/SendEmail.jtp?type=nodenode=4743i=8: +Table, DataTable --- How are votes counted? When voting for the name of the project, we didn't vote for one name, but gave a preference ordering. In this case, I am for Table or DataTable, but what happens if I vote for Table and then there is a tie between DataTable and Relation? Will Table count for DataTable then? – Ufuk On 25 Mar 2015, at 18:33, Vasiliki Kalavri [hidden email]/user/SendEmail.jtp?type=nodenode=4743i=9 wrote: +Relation On Mar 25, 2015 6:29 PM, Henry Saputra [hidden email]/user/SendEmail.jtp?type=nodenode=4743i=10 wrote: +Relation PS Aljoscha, don't forget to cast your own vote :) On Wednesday, March 25, 2015, Aljoscha Krettek [hidden email]/user/SendEmail.jtp?type=nodenode=4743i=11 wrote: Please vote on the new name of the equivalent to DataSet and DataStream in the new expression-based API. From the previous discussion thread three names emerged: Relation, Table and DataTable. The vote is open for the next 72 hours. The name with the most votes is chosen. If the vote ends with no name having the most votes, a new vote with an alternative voting scheme will be done. Please vote either of these: +Relation +Table +DataTable If you reply to this email, your message will be added to the discussion below: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/VOTE-Name-of-Expression-API-Representation-tp4708p4743.html To start a new topic under Apache Flink (Incubator) Mailing List archive., email ml-node+s1008284n1...@n3.nabble.commailto: ml-node+s1008284n1...@n3.nabble.com To unsubscribe from Apache Flink (Incubator) Mailing List archive., click here
[jira] [Created] (FLINK-1799) Scala API does not support generic arrays
Till Rohrmann created FLINK-1799: Summary: Scala API does not support generic arrays Key: FLINK-1799 URL: https://issues.apache.org/jira/browse/FLINK-1799 Project: Flink Issue Type: Bug Reporter: Till Rohrmann Assignee: Aljoscha Krettek The Scala API does not support generic arrays at the moment. It throws a rather unhelpful error message ```InvalidTypesException: The given type is not a valid object array```. Code to reproduce the problem is given below: {code} def main(args: Array[String]) { foobar[Double] } def foobar[T: ClassTag: TypeInformation]: DataSet[Block[T]] = { val tpe = createTypeInformation[Array[T]] null } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Add a Beta badge in the documentation to components in flink-staging
+1 for using annotations to mark the status of API classes/methods. I think that is very good practice to manage backwards-compatibility. On Sun, Mar 29, 2015 at 8:20 PM, Henry Saputra henry.sapu...@gmail.com wrote: +1 to this. Was thinking about the same thing. - Henry On Sun, Mar 29, 2015 at 7:38 AM, Robert Metzger rmetz...@apache.org wrote: Hi, In an offline discussion with other Flink committers, we came up with the idea to mark new components from the flink-staging module with a Beta badge in the documentation. This way, we make it very clear that the component is still under heavy development. If we agree on this, I'll file a JIRA and add the badge to the documentation. Best, Robert
Re: [DISCUSS] Add a Beta badge in the documentation to components in flink-staging
I filed the JIRA for the beta badge: https://issues.apache.org/jira/browse/FLINK-1800 On Mon, Mar 30, 2015 at 12:34 PM, Maximilian Michels m...@apache.org wrote: +1 for using annotations to mark the status of API classes/methods. I think that is very good practice to manage backwards-compatibility. On Sun, Mar 29, 2015 at 8:20 PM, Henry Saputra henry.sapu...@gmail.com wrote: +1 to this. Was thinking about the same thing. - Henry On Sun, Mar 29, 2015 at 7:38 AM, Robert Metzger rmetz...@apache.org wrote: Hi, In an offline discussion with other Flink committers, we came up with the idea to mark new components from the flink-staging module with a Beta badge in the documentation. This way, we make it very clear that the component is still under heavy development. If we agree on this, I'll file a JIRA and add the badge to the documentation. Best, Robert
[jira] [Created] (FLINK-1802) BlobManager directories should be checked before TaskManager startup
Stephan Ewen created FLINK-1802: --- Summary: BlobManager directories should be checked before TaskManager startup Key: FLINK-1802 URL: https://issues.apache.org/jira/browse/FLINK-1802 Project: Flink Issue Type: Sub-task Components: TaskManager Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 0.9 That allows the call to start the taskmanager to fail early and synchronous, improving debugability. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Extracting detailed Flink execution plan
Hi I am trying to extract/retrieve the Flink execution plan. I managed to get it as JSON string in following ways: 1. Using JAR - via PackagedProgram using getPreviewPlan() ; or 2. Directly in program - via ExecutionEnvironment's getExecutionPlan() My question is - Is it possible to retrieve directly the Plan object? I tried for this but was not successful as submitting the jar takes us into interactive mode, and in order to use the other mode, programEntryPoint, the main class needs to implement Program interface with getPlan method. Even if we manage to get the execution plan as a Plan object, will it be different from what we have using JSON string? like in terms of - 1. What are the datatypes used in the dataset's tuple 2. On what key is the Join taking place 3. Filtering predicate 4. Field for Distinct and so on (JSON plan does have the operator tree but the contents field points to the line of code in the class, which is not that helpful) If not, is it possible (by some other way) to get the above details just by using the Flink job/jar as an input? Thanks and Regards Amit Pawar
[jira] [Created] (FLINK-1805) The class IOManagerAsync(in org.apache.flink.runtime.io.disk.iomanager) should use its own Log
Sibao Hong created FLINK-1805: - Summary: The class IOManagerAsync(in org.apache.flink.runtime.io.disk.iomanager) should use its own Log Key: FLINK-1805 URL: https://issues.apache.org/jira/browse/FLINK-1805 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: master Reporter: Sibao Hong Assignee: Sibao Hong Although class 'IOManagerAsync' is extended from 'IOManager' in package 'org.apache.flink.runtime.io.disk.iomanager', but I think it should has its own Log instance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Extracting detailed Flink execution plan
Hi Amit! The DataSet API is basically a fluent builder for the internal DAG of operations, the Plan. This plan is build when you call env.execute(). You can directly get the Plan by calling ExecutionEnvironment#createProgramPlan() The JSON plan has in addition the information inserted by the Optimizer (what partitioning to use where, what keys to use). This is called the OptimizedPlan. To obtain that, you have to push the Plan through the Optimizer: OptimizedPlan op = new Optimizer(new DataStaristics(), new DefaultCostEstimator()).compile(plan) That optimized plan has everything in information for the execution. The JSON is created from that OptimizedPlan via new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optimizedPlan) Note: These classnames and instructions refer to Flink 0.9. For version 0.8, the names are a bit different. Greetings, Stephan On Mon, Mar 30, 2015 at 5:22 PM, Amit Pawar amitpawar5...@gmail.com wrote: Hi I am trying to extract/retrieve the Flink execution plan. I managed to get it as JSON string in following ways: 1. Using JAR - via PackagedProgram using getPreviewPlan() ; or 2. Directly in program - via ExecutionEnvironment's getExecutionPlan() My question is - Is it possible to retrieve directly the Plan object? I tried for this but was not successful as submitting the jar takes us into interactive mode, and in order to use the other mode, programEntryPoint, the main class needs to implement Program interface with getPlan method. Even if we manage to get the execution plan as a Plan object, will it be different from what we have using JSON string? like in terms of - 1. What are the datatypes used in the dataset's tuple 2. On what key is the Join taking place 3. Filtering predicate 4. Field for Distinct and so on (JSON plan does have the operator tree but the contents field points to the line of code in the class, which is not that helpful) If not, is it possible (by some other way) to get the above details just by using the Flink job/jar as an input? Thanks and Regards Amit Pawar
Re: [DISCUSS] Make a release to be announced at ApacheCon
Okay, I think we have reached consensus on this. I'll create a RC0 non-voting, preview release candidate for 0.9.0- milestone-1 on Thursday (April 2) this week so that we have version to tests against. Once all issues of RC0 have been resolved, we'll start voting in the week of April 6. (The vote needs to start at the latest of April 7, so that we have time on Friday to update the website, send the final release files to the mirrors (they need 24 hrs) and mvn central (24 hrs as well). Monday after that ApacheCon will start.) I'll be on vacation at the beginning of next week, but I'm sure Marton or Ufuk can also create RC1 and the VOTE. I'll start documenting the release process in the Wiki, including a list required verification steps during the VOTE process. On Fri, Mar 27, 2015 at 4:06 PM, Till Rohrmann trohrm...@apache.org wrote: +1 for 0.9.0-milestone-1 On Fri, Mar 27, 2015 at 3:52 PM, Kostas Tzoumas ktzou...@apache.org wrote: +1 On Fri, Mar 27, 2015 at 3:44 PM, Aljoscha Krettek aljos...@apache.org wrote: +1 for 0.9.0-M1 (or milestone-1) On Mar 27, 2015 2:45 PM, Ufuk Celebi u...@apache.org wrote: On Friday, March 27, 2015, Maximilian Michels m...@apache.org wrote: +1 for 0.9.0-milestone-1 +1
Re: [DISCUSS] Make a release to be announced at ApacheCon
+1 Would be good to have well documented release process with all the black magic scripts we have =) Thanks for driving this, Robert. - Henry On Mon, Mar 30, 2015 at 11:15 AM, Robert Metzger rmetz...@apache.org wrote: Okay, I think we have reached consensus on this. I'll create a RC0 non-voting, preview release candidate for 0.9.0- milestone-1 on Thursday (April 2) this week so that we have version to tests against. Once all issues of RC0 have been resolved, we'll start voting in the week of April 6. (The vote needs to start at the latest of April 7, so that we have time on Friday to update the website, send the final release files to the mirrors (they need 24 hrs) and mvn central (24 hrs as well). Monday after that ApacheCon will start.) I'll be on vacation at the beginning of next week, but I'm sure Marton or Ufuk can also create RC1 and the VOTE. I'll start documenting the release process in the Wiki, including a list required verification steps during the VOTE process. On Fri, Mar 27, 2015 at 4:06 PM, Till Rohrmann trohrm...@apache.org wrote: +1 for 0.9.0-milestone-1 On Fri, Mar 27, 2015 at 3:52 PM, Kostas Tzoumas ktzou...@apache.org wrote: +1 On Fri, Mar 27, 2015 at 3:44 PM, Aljoscha Krettek aljos...@apache.org wrote: +1 for 0.9.0-M1 (or milestone-1) On Mar 27, 2015 2:45 PM, Ufuk Celebi u...@apache.org wrote: On Friday, March 27, 2015, Maximilian Michels m...@apache.org wrote: +1 for 0.9.0-milestone-1 +1
Re: Travis-CI builds queuing up
It seems that the issue is fixed. I've just pushed two times to a pull request and it immediately started building both. I think the apache user has much more parallel builds available now (we don't have any builds queuing up anymore). On Thu, Mar 26, 2015 at 4:06 PM, Henry Saputra henry.sapu...@gmail.com wrote: Awesome news! On Thursday, March 26, 2015, Robert Metzger rmetz...@apache.org wrote: Travis replied me with very good news: Somebody from INFRA was asking the same question around the same time as I did and Travis is working on adding more build capacity for the apache github organization. I hope we'll soon have quicker builds again. On Tue, Mar 24, 2015 at 4:42 PM, Henry Saputra henry.sapu...@gmail.com javascript:; wrote: That's good idea. Should be good to have mix of stable with Apache Jenkins for master and PRs, and Travis for individual forks. - Henry On Tue, Mar 24, 2015 at 8:03 AM, Maximilian Michels m...@apache.org javascript:; wrote: Hey! I would also like to continue using Travis but the current situation is not acceptable because we practically can't use Travis anymore for pull requests or the current master. If it cannot be resolved then I think we should move on. The builds service team [1] at Apache offers Jenkins [2] for continuous integration. I think it should be fairly simple to set up. We could still use Travis in our forked repositories but have a reliable CI solution for the master and pull requests. Max [1] https://builds.apache.org/ [2] http://jenkins-ci.org On Tue, Mar 24, 2015 at 3:46 PM, Márton Balassi balassi.mar...@gmail.com javascript:; wrote: I also like the travis infrastucture. Thanks for bringing this up and reaching out to the travis guys. On Tue, Mar 24, 2015 at 3:38 PM, Robert Metzger rmetz...@apache.org javascript:; wrote: Hi guys, the build queue on travis is getting very very long. It seems that it takes 4 days now until commits to master are build. The nightly builds from the website and the maven snapshots are also delayed by that. Right now, there are 33 pull request builds scheduled ( https://travis-ci.org/apache/flink/pull_requests), and 8 builds on master: https://travis-ci.org/apache/flink/builds. The problem is that travis accounts are per github user. In our case, the user is apache, so all ASF projects that have travis enabled share 5 concurrent builders. I would actually like to continue using Travis. The easiest option is probably asking travis if they can give the apache user more build capacity. If thats not possible, we have to look into other options. I'm going to ask Travis if they can do anything about it. Robert
Re: Memory segment error
Hi Fabian, I'll see what I can do :). I am just a bit shocked. If this set of coGroups and joins was too much for a test case, how come the following worked? https://github.com/andralungu/flink/commit/f60b022de056ac259459b68eee6ff0ae9993f0f8 400 lines of complex computations :) And I have an even bigger one for which the test also passed... On Mon, Mar 30, 2015 at 2:31 PM, Fabian Hueske fhue...@gmail.com wrote: Hi Andra, I found the cause for the exception. Your test case is simply too complex for our testing environment. We restrict the TM memory for testcases to 80MB in order to execute multiple tests in parallel on Travis. I counted the memory consumers in your job and got: - 2 Combine - 4 GroupReduce - 4 CoGroup - 2 Joins - 1 SolutionSet Those are quite a few memory consumers for 20MB per slot (4 slots per TM). Do you see a way to reduce the number of operators in your testcase, maybe by splitting it in half? 2015-03-30 11:01 GMT+02:00 Andra Lungu lungu.an...@gmail.com: Sure, It was in the first mail but that was sent a while ago :) This is the code: https://github.com/andralungu/gelly-partitioning/tree/alphaSplit I also added the log4j file in case it helps! The error is totally reproducible. 2 out of 2 people got the same. Steps to reproduce: 1). Clone the code; switch to alphaSplit branch 2). Run CounDegreeITCase.java Hope we can get to the bottom of this! If you need something, just ask. On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske fhue...@gmail.com wrote: Hmm, that is really weird. Can you point me to a branch in your repository and the test case that gives the error? Then I have a look at it and try to figure out what's going wrong. Cheers, Fabian 2015-03-30 10:43 GMT+02:00 Andra Lungu lungu.an...@gmail.com: Hello, I went on and did some further debugging on this issue. Even though the exception said that the problem comes from here: 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR org.apache.flink.runtime.operators.RegularPactTask - Error in task code: Join(Join at weighEdges(NodeSplitting.java:117)) (1/4) java.lang.Exception: The data preparation for task 'Join(Join at weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory segments provided. Hash Join needs at least 33 memory segments. at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) which is basically a chain of two joins, schema that I have repeated several times, including in the getTriplets() method and it passed every time. I thought that this could not be right! So I picked each intermediate data set formed, printed it and added a System.exit(0) afterwards. The exception comes from this method: aggregatePartialValuesSplitVertices. Even though this computes the correct result, it then throws the memory segment exception(!! Just for the Cluster test - everything else works). The code in the function is: private static DataSetVertexString, Long aggregatePartialValuesSplitVertices(DataSetVertexString, Long resultedVertices) { return resultedVertices.flatMap(new FlatMapFunctionVertexString, Long, VertexString, Long() { @Override public void flatMap(VertexString, Long vertex, CollectorVertexString, Long collector) throws Exception { int pos = vertex.getId().indexOf(_); // if there is a splitted vertex if(pos -1) { collector.collect(new VertexString, Long(vertex.getId().substring(0, pos), vertex.getValue())); } else { collector.collect(vertex); } } }).groupBy(0).reduceGroup(new GroupReduceFunctionVertexString, Long, VertexString, Long() { @Override public void reduce(IterableVertexString, Long iterable, CollectorVertexString, Long collector) throws Exception { long sum = 0; VertexString, Long vertex = new VertexString, Long(); IteratorVertexString, Long iterator = iterable.iterator(); while (iterator.hasNext()) { vertex = iterator.next(); sum += vertex.getValue(); } collector.collect(new VertexString, Long(vertex.getId(), sum)); } }); To me, nothing seems out of the ordinary here. This is regular user code. And the behaviour in the end is definitely not the one expected. Any idea why this might be
Re: Memory segment error
Oh! In that case, who should I refer to? :D [It's kind of ugly to split this kind of test. I mean if a person is counting the degrees, then that's the result that should be tested - at least in my opinion] In any case, thanks for the help :) On Mon, Mar 30, 2015 at 11:37 PM, Fabian Hueske fhue...@gmail.com wrote: Well, each combiner, reducer, join, coGroup, and solutionset needs a share of memory (maps filters don't). In your case it was pretty much at the edge, the hash joins require 33 buffers and got 32. So one memory-consuming operator less might fix it. I did not look in detail at the other job, but it did not seem so much more complex than the other. As said before, LOCs or total number of operators are not the important thing here. It's the number of memory consumers. I am not sure how hard the 80MB limit is. Maybe it is possible to increase that a bit. 2015-03-30 23:25 GMT+02:00 Andra Lungu lungu.an...@gmail.com: Hi Fabian, I'll see what I can do :). I am just a bit shocked. If this set of coGroups and joins was too much for a test case, how come the following worked? https://github.com/andralungu/flink/commit/f60b022de056ac259459b68eee6ff0ae9993f0f8 400 lines of complex computations :) And I have an even bigger one for which the test also passed... On Mon, Mar 30, 2015 at 2:31 PM, Fabian Hueske fhue...@gmail.com wrote: Hi Andra, I found the cause for the exception. Your test case is simply too complex for our testing environment. We restrict the TM memory for testcases to 80MB in order to execute multiple tests in parallel on Travis. I counted the memory consumers in your job and got: - 2 Combine - 4 GroupReduce - 4 CoGroup - 2 Joins - 1 SolutionSet Those are quite a few memory consumers for 20MB per slot (4 slots per TM). Do you see a way to reduce the number of operators in your testcase, maybe by splitting it in half? 2015-03-30 11:01 GMT+02:00 Andra Lungu lungu.an...@gmail.com: Sure, It was in the first mail but that was sent a while ago :) This is the code: https://github.com/andralungu/gelly-partitioning/tree/alphaSplit I also added the log4j file in case it helps! The error is totally reproducible. 2 out of 2 people got the same. Steps to reproduce: 1). Clone the code; switch to alphaSplit branch 2). Run CounDegreeITCase.java Hope we can get to the bottom of this! If you need something, just ask. On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske fhue...@gmail.com wrote: Hmm, that is really weird. Can you point me to a branch in your repository and the test case that gives the error? Then I have a look at it and try to figure out what's going wrong. Cheers, Fabian 2015-03-30 10:43 GMT+02:00 Andra Lungu lungu.an...@gmail.com: Hello, I went on and did some further debugging on this issue. Even though the exception said that the problem comes from here: 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR org.apache.flink.runtime.operators.RegularPactTask - Error in task code: Join(Join at weighEdges(NodeSplitting.java:117)) (1/4) java.lang.Exception: The data preparation for task 'Join(Join at weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory segments provided. Hash Join needs at least 33 memory segments. at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) which is basically a chain of two joins, schema that I have repeated several times, including in the getTriplets() method and it passed every time. I thought that this could not be right! So I picked each intermediate data set formed, printed it and added a System.exit(0) afterwards. The exception comes from this method: aggregatePartialValuesSplitVertices. Even though this computes the correct result, it then throws the memory segment exception(!! Just for the Cluster test - everything else works). The code in the function is: private static DataSetVertexString, Long aggregatePartialValuesSplitVertices(DataSetVertexString, Long resultedVertices) { return resultedVertices.flatMap(new FlatMapFunctionVertexString, Long, VertexString, Long() { @Override public void flatMap(VertexString, Long vertex, CollectorVertexString, Long collector) throws Exception
Re: Memory segment error
Well, each combiner, reducer, join, coGroup, and solutionset needs a share of memory (maps filters don't). In your case it was pretty much at the edge, the hash joins require 33 buffers and got 32. So one memory-consuming operator less might fix it. I did not look in detail at the other job, but it did not seem so much more complex than the other. As said before, LOCs or total number of operators are not the important thing here. It's the number of memory consumers. I am not sure how hard the 80MB limit is. Maybe it is possible to increase that a bit. 2015-03-30 23:25 GMT+02:00 Andra Lungu lungu.an...@gmail.com: Hi Fabian, I'll see what I can do :). I am just a bit shocked. If this set of coGroups and joins was too much for a test case, how come the following worked? https://github.com/andralungu/flink/commit/f60b022de056ac259459b68eee6ff0ae9993f0f8 400 lines of complex computations :) And I have an even bigger one for which the test also passed... On Mon, Mar 30, 2015 at 2:31 PM, Fabian Hueske fhue...@gmail.com wrote: Hi Andra, I found the cause for the exception. Your test case is simply too complex for our testing environment. We restrict the TM memory for testcases to 80MB in order to execute multiple tests in parallel on Travis. I counted the memory consumers in your job and got: - 2 Combine - 4 GroupReduce - 4 CoGroup - 2 Joins - 1 SolutionSet Those are quite a few memory consumers for 20MB per slot (4 slots per TM). Do you see a way to reduce the number of operators in your testcase, maybe by splitting it in half? 2015-03-30 11:01 GMT+02:00 Andra Lungu lungu.an...@gmail.com: Sure, It was in the first mail but that was sent a while ago :) This is the code: https://github.com/andralungu/gelly-partitioning/tree/alphaSplit I also added the log4j file in case it helps! The error is totally reproducible. 2 out of 2 people got the same. Steps to reproduce: 1). Clone the code; switch to alphaSplit branch 2). Run CounDegreeITCase.java Hope we can get to the bottom of this! If you need something, just ask. On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske fhue...@gmail.com wrote: Hmm, that is really weird. Can you point me to a branch in your repository and the test case that gives the error? Then I have a look at it and try to figure out what's going wrong. Cheers, Fabian 2015-03-30 10:43 GMT+02:00 Andra Lungu lungu.an...@gmail.com: Hello, I went on and did some further debugging on this issue. Even though the exception said that the problem comes from here: 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR org.apache.flink.runtime.operators.RegularPactTask - Error in task code: Join(Join at weighEdges(NodeSplitting.java:117)) (1/4) java.lang.Exception: The data preparation for task 'Join(Join at weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory segments provided. Hash Join needs at least 33 memory segments. at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:745) which is basically a chain of two joins, schema that I have repeated several times, including in the getTriplets() method and it passed every time. I thought that this could not be right! So I picked each intermediate data set formed, printed it and added a System.exit(0) afterwards. The exception comes from this method: aggregatePartialValuesSplitVertices. Even though this computes the correct result, it then throws the memory segment exception(!! Just for the Cluster test - everything else works). The code in the function is: private static DataSetVertexString, Long aggregatePartialValuesSplitVertices(DataSetVertexString, Long resultedVertices) { return resultedVertices.flatMap(new FlatMapFunctionVertexString, Long, VertexString, Long() { @Override public void flatMap(VertexString, Long vertex, CollectorVertexString, Long collector) throws Exception { int pos = vertex.getId().indexOf(_); // if there is a splitted vertex if(pos -1) { collector.collect(new VertexString, Long(vertex.getId().substring(0, pos), vertex.getValue())); } else { collector.collect(vertex); } } }).groupBy(0).reduceGroup(new GroupReduceFunctionVertexString, Long, VertexString, Long() {