[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter

2015-07-23 Thread Stefano Bortoli (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14640007#comment-14640007
 ] 

Stefano Bortoli commented on FLINK-2394:


I see. I was in fact a little disappointed with mongo-hadoop for having a 
method getOutputCommitter() which was not standard in the OutputFormat. 
However, my first implementation was using the mapred Hadoop, so no big 
surprises.

I would say that a good idea could be to simply have either 2 
HadoopOutputFormatBase classes (1 per version of Hadoop), or handle the hadoop 
version to get the OutputCommitter accordingly.

Meanwhile, I have implemented my own MongoHadoopOutputFormat extending the 
HadoopOutputFormat and overriding the open and close methods replacing the 
FileOutputCommitter with the MongoOutputCommiter. 

> HadoopOutFormat OutputCommitter is default to FileOutputCommiter
> 
>
> Key: FLINK-2394
> URL: https://issues.apache.org/jira/browse/FLINK-2394
> Project: Flink
>  Issue Type: Bug
>  Components: Hadoop Compatibility
>Affects Versions: 0.9.0
>Reporter: Stefano Bortoli
>
> MongoOutputFormat does not write back in collection because the 
> HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and 
> is set as default to FileOutputCommitter. Therefore, on close and 
> globalFinalize execution the commit does not happen and mongo collection 
> stays untouched. 
> A simple solution would be to:
> 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat 
> that gets the OutputCommitter as a parameter
> 2 - change the outputCommitter field of HadoopOutputFormatBase to be a 
> generic OutputCommitter
> 3 - remove the default assignment in the open() and finalizeGlobal to the 
> outputCommitter to FileOutputCommitter(), or keep it as a default in case of 
> no specific assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2106) Add outer joins to API, Optimizer, and Runtime

2015-07-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639686#comment-14639686
 ] 

Fabian Hueske commented on FLINK-2106:
--

Hi [~jkovacs] and [~r-pogalz], I had a look at your PR for FLINK-2105. Looks 
very good to me. We should run a few more tests, but after that I am confident 
that we can merge the PR. 
I'm assigning this issue to you.

> Add outer joins to API, Optimizer, and Runtime
> --
>
> Key: FLINK-2106
> URL: https://issues.apache.org/jira/browse/FLINK-2106
> Project: Flink
>  Issue Type: Sub-task
>  Components: Java API, Local Runtime, Optimizer, Scala API
>Reporter: Fabian Hueske
>Assignee: Ricky Pogalz
>Priority: Minor
> Fix For: pre-apache
>
>
> Add left/right/full outer join methods to the DataSet APIs (Java, Scala), to 
> the optimizer, and the runtime of Flink.
> Initially, the execution strategy should be a sort-merge outer join 
> (FLINK-2105) but can later be extended to hash joins for left/right outer 
> joins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2106) Add outer joins to API, Optimizer, and Runtime

2015-07-23 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-2106:
-
Assignee: Ricky Pogalz

> Add outer joins to API, Optimizer, and Runtime
> --
>
> Key: FLINK-2106
> URL: https://issues.apache.org/jira/browse/FLINK-2106
> Project: Flink
>  Issue Type: Sub-task
>  Components: Java API, Local Runtime, Optimizer, Scala API
>Reporter: Fabian Hueske
>Assignee: Ricky Pogalz
>Priority: Minor
> Fix For: pre-apache
>
>
> Add left/right/full outer join methods to the DataSet APIs (Java, Scala), to 
> the optimizer, and the runtime of Flink.
> Initially, the execution strategy should be a sort-merge outer join 
> (FLINK-2105) but can later be extended to hash joins for left/right outer 
> joins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639681#comment-14639681
 ] 

ASF GitHub Bot commented on FLINK-2105:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/907#issuecomment-124270766
  
Very nice work! :+1: I had just a few minor style regards.

It would be nice if you could check the overhead of having all outer join 
cases handled by the same class vs. having a dedicated class for LEFT, RIGHT, 
and FULL OUTER. 

I would also like to run a major regression benchmark to double-check that 
the refactoring of the current sort-merge join didn't cause a performance 
regression. I can take care of that once you addressed the comments.

If all lights are green after that, I would be happy to merge this PR.


> Implement Sort-Merge Outer Join algorithm
> -
>
> Key: FLINK-2105
> URL: https://issues.apache.org/jira/browse/FLINK-2105
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Ricky Pogalz
>Priority: Minor
> Fix For: pre-apache
>
>
> Flink does not natively support outer joins at the moment. 
> This issue proposes to implement a sort-merge outer join algorithm that can 
> cover left, right, and full outer joins.
> The implementation can be based on the regular sort-merge join iterator 
> ({{ReusingMergeMatchIterator}} and {{NonReusingMergeMatchIterator}}, see also 
> {{MatchDriver}} class)
> The Reusing and NonReusing variants differ in whether object instances are 
> reused or new objects are created. I would start with the NonReusing variant 
> which is safer from a user's point of view and should also be easier to 
> implement.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...

2015-07-23 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/907#issuecomment-124270766
  
Very nice work! :+1: I had just a few minor style regards.

It would be nice if you could check the overhead of having all outer join 
cases handled by the same class vs. having a dedicated class for LEFT, RIGHT, 
and FULL OUTER. 

I would also like to run a major regression benchmark to double-check that 
the refactoring of the current sort-merge join didn't cause a performance 
regression. I can take care of that once you addressed the comments.

If all lights are green after that, I would be happy to merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...

2015-07-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35384117
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
 ---
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import 
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
+import 
org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
+import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.util.ResettableMutableObjectIterator;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+
+public class ReusingSortMergeOuterJoinIteratorITCase {
+
+   // total memory
+   private static final int MEMORY_SIZE = 1024 * 1024 * 16;
+   private static final int PAGES_FOR_BNLJN = 2;
+
+   // the size of the left and right inputs
+   private static final int INPUT_1_SIZE = 2;
+
+   private static final int INPUT_2_SIZE = 1000;
+
+   // random seeds for the left and right input data generators
+   private static final long SEED1 = 561349061987311L;
+
+   private static final long SEED2 = 231434613412342L;
+
+   // dummy abstract task
+   private final AbstractInvokable parentTask = new DummyInvokable();
+
+   private IOManager ioManager;
+   private MemoryManager memoryManager;
+
+   private TupleTypeInfo> typeInfo1;
+   private TupleTypeInfo> typeInfo2;
+   private TupleSerializer> serializer1;
+   private TupleSerializer> serializer2;
+   private TypeComparator> comparator1;
+   private TypeComparator> comparator2;
+   private TypePairComparator, Tuple2> pairComp;
+
+
+   @Before
+   public void beforeTest() {
+   ExecutionConfig config = new ExecutionConfig();
+   config.disableObjectReuse();
+
+   typeInfo1 = TupleTypeInfo.getBasicTupleTypeInfo(String.class, 
String.class);
+   typeInfo2 = TupleTy

[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639677#comment-14639677
 ] 

ASF GitHub Bot commented on FLINK-2105:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35384117
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
 ---
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import 
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
+import 
org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
+import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.util.ResettableMutableObjectIterator;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+
+public class ReusingSortMergeOuterJoinIteratorITCase {
+
+   // total memory
+   private static final int MEMORY_SIZE = 1024 * 1024 * 16;
+   private static final int PAGES_FOR_BNLJN = 2;
+
+   // the size of the left and right inputs
+   private static final int INPUT_1_SIZE = 2;
+
+   private static final int INPUT_2_SIZE = 1000;
+
+   // random seeds for the left and right input data generators
+   private static final long SEED1 = 561349061987311L;
+
+   private static final long SEED2 = 231434613412342L;
+
+   // dummy abstract task
+   private final AbstractInvokable parentTask = new DummyInvokable();
+
+   private IOManager ioManager;
+   private MemoryManager memoryManager;
+
+   private TupleTypeInfo> typeInfo1;
+   private TupleTypeInfo> typeInfo2;
+   private TupleSerializer> serializer1;
+   private TupleSerializer> serializer2;
+   private TypeComparator> comparator1;
+   private TypeComparator> comparator2;
+   private TypePairComparator, Tuple2> pairComp;
+
+
+   @Before
+   public void bef

[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639640#comment-14639640
 ] 

ASF GitHub Bot commented on FLINK-2105:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35381702
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
 ---
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import 
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
+import 
org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
+import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.util.ResettableMutableObjectIterator;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NonReusingSortMergeOuterJoinIteratorITCase {
+
+   // total memory
+   private static final int MEMORY_SIZE = 1024 * 1024 * 16;
+   private static final int PAGES_FOR_BNLJN = 2;
+
+   // the size of the left and right inputs
+   private static final int INPUT_1_SIZE = 2;
+
+   private static final int INPUT_2_SIZE = 1000;
+
+   // random seeds for the left and right input data generators
+   private static final long SEED1 = 561349061987311L;
+
+   private static final long SEED2 = 231434613412342L;
+
+   // dummy abstract task
+   private final AbstractInvokable parentTask = new DummyInvokable();
+
+   private IOManager ioManager;
+   private MemoryManager memoryManager;
+
+   private TupleTypeInfo> typeInfo1;
+   private TupleTypeInfo> typeInfo2;
+   private TupleSerializer> serializer1;
+   pr

[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...

2015-07-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35381702
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
 ---
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
+import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import 
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
+import 
org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
+import 
org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.util.ResettableMutableObjectIterator;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NonReusingSortMergeOuterJoinIteratorITCase {
+
+   // total memory
+   private static final int MEMORY_SIZE = 1024 * 1024 * 16;
+   private static final int PAGES_FOR_BNLJN = 2;
+
+   // the size of the left and right inputs
+   private static final int INPUT_1_SIZE = 2;
+
+   private static final int INPUT_2_SIZE = 1000;
+
+   // random seeds for the left and right input data generators
+   private static final long SEED1 = 561349061987311L;
+
+   private static final long SEED2 = 231434613412342L;
+
+   // dummy abstract task
+   private final AbstractInvokable parentTask = new DummyInvokable();
+
+   private IOManager ioManager;
+   private MemoryManager memoryManager;
+
+   private TupleTypeInfo> typeInfo1;
+   private TupleTypeInfo> typeInfo2;
+   private TupleSerializer> serializer1;
+   private TupleSerializer> serializer2;
+   private TypeComparator> comparator1;
+   private TypeComparator> comparator2;
+   private TypePairComparator, Tuple2> pairComp;
+
+
+   @Before
+   public void beforeTest() {
+

[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639621#comment-14639621
 ] 

ASF GitHub Bot commented on FLINK-2105:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35379539
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
+   private Boolean it2Empty;
+
+   private OuterJoinType outerJoinType;
+
+
+   public AbstractMergeOuterJoinIterator(
+   OuterJoinType outerJoinType,
+   MutableObjectIterator input1,
+   MutableObjectIterator input2,
+   TypeSerializer serializer1, TypeComparator 
comparator1,
+   TypeSerializer serializer2, TypeComparator 
comparator2,
+   TypePairComparator pairComparator,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   int numMemoryPages,
+   AbstractInvokable parentTask)
+   throws MemoryAllocationException {
+   super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+
+   this.outerJoinType = outerJoinType;
+   }
+
+   /**
+* Calls the JoinFunction#match() method for all two 
key-value pairs that share the same key and come
+* from different inputs. Furthermore, depending on the outer join type 
(LEFT, RIGHT, FULL), all key-value pairs where no
+* matching partner from the other input exists are joined with null.
+* The output of the match() method is forwarded.
+*
+* @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+* @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
+*/
+   @Override
+   public boolean callWithNextKey(final FlatJoinFunction 
matchFunction, final Collector collector) throws Exception {
+   if(it1Empty == null && it2Empty == null) {
+   //first run, set iterators to first elements
+   it1Empty = !this.iterator1.nextKey();
+   it2Empty = !this.iterator2.nextKey();
+   }
+
+   if (it1Empty && it2Empty) {
+   return false;
+   }
+   if (it2Empty) {
+   if(outerJoinType == OuterJoinType.LEFT || outerJoinType 
== OuterJoinType.FULL) {
+   
joinLeftKeyValuesWithNull(ite

[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639619#comment-14639619
 ] 

ASF GitHub Bot commented on FLINK-2105:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35379501
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
+   private Boolean it2Empty;
+
+   private OuterJoinType outerJoinType;
+
+
+   public AbstractMergeOuterJoinIterator(
+   OuterJoinType outerJoinType,
+   MutableObjectIterator input1,
+   MutableObjectIterator input2,
+   TypeSerializer serializer1, TypeComparator 
comparator1,
+   TypeSerializer serializer2, TypeComparator 
comparator2,
+   TypePairComparator pairComparator,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   int numMemoryPages,
+   AbstractInvokable parentTask)
+   throws MemoryAllocationException {
+   super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+
+   this.outerJoinType = outerJoinType;
+   }
+
+   /**
+* Calls the JoinFunction#match() method for all two 
key-value pairs that share the same key and come
+* from different inputs. Furthermore, depending on the outer join type 
(LEFT, RIGHT, FULL), all key-value pairs where no
+* matching partner from the other input exists are joined with null.
+* The output of the match() method is forwarded.
+*
+* @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+* @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
+*/
+   @Override
+   public boolean callWithNextKey(final FlatJoinFunction 
matchFunction, final Collector collector) throws Exception {
+   if(it1Empty == null && it2Empty == null) {
+   //first run, set iterators to first elements
+   it1Empty = !this.iterator1.nextKey();
+   it2Empty = !this.iterator2.nextKey();
+   }
+
+   if (it1Empty && it2Empty) {
+   return false;
+   }
+   if (it2Empty) {
--- End diff --

Please use `else if` to make it more obvious that these are conditions are 
mutually exclusive.


> Implement Sort-Merge Outer Join algorit

[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...

2015-07-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35379539
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
+   private Boolean it2Empty;
+
+   private OuterJoinType outerJoinType;
+
+
+   public AbstractMergeOuterJoinIterator(
+   OuterJoinType outerJoinType,
+   MutableObjectIterator input1,
+   MutableObjectIterator input2,
+   TypeSerializer serializer1, TypeComparator 
comparator1,
+   TypeSerializer serializer2, TypeComparator 
comparator2,
+   TypePairComparator pairComparator,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   int numMemoryPages,
+   AbstractInvokable parentTask)
+   throws MemoryAllocationException {
+   super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+
+   this.outerJoinType = outerJoinType;
+   }
+
+   /**
+* Calls the JoinFunction#match() method for all two 
key-value pairs that share the same key and come
+* from different inputs. Furthermore, depending on the outer join type 
(LEFT, RIGHT, FULL), all key-value pairs where no
+* matching partner from the other input exists are joined with null.
+* The output of the match() method is forwarded.
+*
+* @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+* @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
+*/
+   @Override
+   public boolean callWithNextKey(final FlatJoinFunction 
matchFunction, final Collector collector) throws Exception {
+   if(it1Empty == null && it2Empty == null) {
+   //first run, set iterators to first elements
+   it1Empty = !this.iterator1.nextKey();
+   it2Empty = !this.iterator2.nextKey();
+   }
+
+   if (it1Empty && it2Empty) {
+   return false;
+   }
+   if (it2Empty) {
+   if(outerJoinType == OuterJoinType.LEFT || outerJoinType 
== OuterJoinType.FULL) {
+   
joinLeftKeyValuesWithNull(iterator1.getValues(), matchFunction, collector);
+   it1Empty = !iterator1.nextKey();
+   return true;
+   }else{
+   //consume rest of left side
+   

[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...

2015-07-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35379501
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
+   private Boolean it2Empty;
+
+   private OuterJoinType outerJoinType;
+
+
+   public AbstractMergeOuterJoinIterator(
+   OuterJoinType outerJoinType,
+   MutableObjectIterator input1,
+   MutableObjectIterator input2,
+   TypeSerializer serializer1, TypeComparator 
comparator1,
+   TypeSerializer serializer2, TypeComparator 
comparator2,
+   TypePairComparator pairComparator,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   int numMemoryPages,
+   AbstractInvokable parentTask)
+   throws MemoryAllocationException {
+   super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+
+   this.outerJoinType = outerJoinType;
+   }
+
+   /**
+* Calls the JoinFunction#match() method for all two 
key-value pairs that share the same key and come
+* from different inputs. Furthermore, depending on the outer join type 
(LEFT, RIGHT, FULL), all key-value pairs where no
+* matching partner from the other input exists are joined with null.
+* The output of the match() method is forwarded.
+*
+* @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+* @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
+*/
+   @Override
+   public boolean callWithNextKey(final FlatJoinFunction 
matchFunction, final Collector collector) throws Exception {
+   if(it1Empty == null && it2Empty == null) {
+   //first run, set iterators to first elements
+   it1Empty = !this.iterator1.nextKey();
+   it2Empty = !this.iterator2.nextKey();
+   }
+
+   if (it1Empty && it2Empty) {
+   return false;
+   }
+   if (it2Empty) {
--- End diff --

Please use `else if` to make it more obvious that these are conditions are 
mutually exclusive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA

[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...

2015-07-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35379150
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
+   private Boolean it2Empty;
+
+   private OuterJoinType outerJoinType;
--- End diff --

Please make `outerJoinType` `final`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639617#comment-14639617
 ] 

ASF GitHub Bot commented on FLINK-2105:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35379150
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
+   private Boolean it2Empty;
+
+   private OuterJoinType outerJoinType;
--- End diff --

Please make `outerJoinType` `final`.


> Implement Sort-Merge Outer Join algorithm
> -
>
> Key: FLINK-2105
> URL: https://issues.apache.org/jira/browse/FLINK-2105
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Ricky Pogalz
>Priority: Minor
> Fix For: pre-apache
>
>
> Flink does not natively support outer joins at the moment. 
> This issue proposes to implement a sort-merge outer join algorithm that can 
> cover left, right, and full outer joins.
> The implementation can be based on the regular sort-merge join iterator 
> ({{ReusingMergeMatchIterator}} and {{NonReusingMergeMatchIterator}}, see also 
> {{MatchDriver}} class)
> The Reusing and NonReusing variants differ in whether object instances are 
> reused or new objects are created. I would start with the NonReusing variant 
> which is safer from a user's point of view and should also be easier to 
> implement.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...

2015-07-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35378409
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
+   private Boolean it2Empty;
+
+   private OuterJoinType outerJoinType;
+
+
+   public AbstractMergeOuterJoinIterator(
+   OuterJoinType outerJoinType,
+   MutableObjectIterator input1,
+   MutableObjectIterator input2,
+   TypeSerializer serializer1, TypeComparator 
comparator1,
+   TypeSerializer serializer2, TypeComparator 
comparator2,
+   TypePairComparator pairComparator,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   int numMemoryPages,
+   AbstractInvokable parentTask)
+   throws MemoryAllocationException {
+   super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+
+   this.outerJoinType = outerJoinType;
+   }
+
+   /**
+* Calls the JoinFunction#match() method for all two 
key-value pairs that share the same key and come
+* from different inputs. Furthermore, depending on the outer join type 
(LEFT, RIGHT, FULL), all key-value pairs where no
+* matching partner from the other input exists are joined with null.
+* The output of the match() method is forwarded.
+*
+* @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+* @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
+*/
+   @Override
+   public boolean callWithNextKey(final FlatJoinFunction 
matchFunction, final Collector collector) throws Exception {
+   if(it1Empty == null && it2Empty == null) {
--- End diff --

You can check the first call with another `boolean` flag. Checking against 
a non-initialized variable is not so nice, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639605#comment-14639605
 ] 

ASF GitHub Bot commented on FLINK-2105:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35378409
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
+   private Boolean it2Empty;
+
+   private OuterJoinType outerJoinType;
+
+
+   public AbstractMergeOuterJoinIterator(
+   OuterJoinType outerJoinType,
+   MutableObjectIterator input1,
+   MutableObjectIterator input2,
+   TypeSerializer serializer1, TypeComparator 
comparator1,
+   TypeSerializer serializer2, TypeComparator 
comparator2,
+   TypePairComparator pairComparator,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   int numMemoryPages,
+   AbstractInvokable parentTask)
+   throws MemoryAllocationException {
+   super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+
+   this.outerJoinType = outerJoinType;
+   }
+
+   /**
+* Calls the JoinFunction#match() method for all two 
key-value pairs that share the same key and come
+* from different inputs. Furthermore, depending on the outer join type 
(LEFT, RIGHT, FULL), all key-value pairs where no
+* matching partner from the other input exists are joined with null.
+* The output of the match() method is forwarded.
+*
+* @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+* @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
+*/
+   @Override
+   public boolean callWithNextKey(final FlatJoinFunction 
matchFunction, final Collector collector) throws Exception {
+   if(it1Empty == null && it2Empty == null) {
--- End diff --

You can check the first call with another `boolean` flag. Checking against 
a non-initialized variable is not so nice, IMO.


> Implement Sort-Merge Outer Join algorithm
> -
>
> Key: FLINK-2105
> URL: https://issues.apache.org/jira/browse/FLINK-2105
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Ricky Po

[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...

2015-07-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35378056
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
+   private Boolean it2Empty;
+
+   private OuterJoinType outerJoinType;
+
+
+   public AbstractMergeOuterJoinIterator(
+   OuterJoinType outerJoinType,
+   MutableObjectIterator input1,
+   MutableObjectIterator input2,
+   TypeSerializer serializer1, TypeComparator 
comparator1,
+   TypeSerializer serializer2, TypeComparator 
comparator2,
+   TypePairComparator pairComparator,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   int numMemoryPages,
+   AbstractInvokable parentTask)
+   throws MemoryAllocationException {
+   super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+
+   this.outerJoinType = outerJoinType;
+   }
+
+   /**
+* Calls the JoinFunction#match() method for all two 
key-value pairs that share the same key and come
+* from different inputs. Furthermore, depending on the outer join type 
(LEFT, RIGHT, FULL), all key-value pairs where no
+* matching partner from the other input exists are joined with null.
+* The output of the match() method is forwarded.
+*
+* @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+* @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
+*/
+   @Override
+   public boolean callWithNextKey(final FlatJoinFunction 
matchFunction, final Collector collector) throws Exception {
+   if(it1Empty == null && it2Empty == null) {
+   //first run, set iterators to first elements
+   it1Empty = !this.iterator1.nextKey();
+   it2Empty = !this.iterator2.nextKey();
+   }
+
+   if (it1Empty && it2Empty) {
+   return false;
+   }
+   if (it2Empty) {
+   if(outerJoinType == OuterJoinType.LEFT || outerJoinType 
== OuterJoinType.FULL) {
+   
joinLeftKeyValuesWithNull(iterator1.getValues(), matchFunction, collector);
+   it1Empty = !iterator1.nextKey();
+   return true;
+   }else{
+   //consume rest of left side
+   

[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639596#comment-14639596
 ] 

ASF GitHub Bot commented on FLINK-2105:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35377999
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
+   private Boolean it2Empty;
+
+   private OuterJoinType outerJoinType;
+
+
+   public AbstractMergeOuterJoinIterator(
+   OuterJoinType outerJoinType,
+   MutableObjectIterator input1,
+   MutableObjectIterator input2,
+   TypeSerializer serializer1, TypeComparator 
comparator1,
+   TypeSerializer serializer2, TypeComparator 
comparator2,
+   TypePairComparator pairComparator,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   int numMemoryPages,
+   AbstractInvokable parentTask)
+   throws MemoryAllocationException {
+   super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+
+   this.outerJoinType = outerJoinType;
--- End diff --

please initialize {{it1Empty}} and {{it2Empty}}.


> Implement Sort-Merge Outer Join algorithm
> -
>
> Key: FLINK-2105
> URL: https://issues.apache.org/jira/browse/FLINK-2105
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Ricky Pogalz
>Priority: Minor
> Fix For: pre-apache
>
>
> Flink does not natively support outer joins at the moment. 
> This issue proposes to implement a sort-merge outer join algorithm that can 
> cover left, right, and full outer joins.
> The implementation can be based on the regular sort-merge join iterator 
> ({{ReusingMergeMatchIterator}} and {{NonReusingMergeMatchIterator}}, see also 
> {{MatchDriver}} class)
> The Reusing and NonReusing variants differ in whether object instances are 
> reused or new objects are created. I would start with the NonReusing variant 
> which is safer from a user's point of view and should also be easier to 
> implement.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639598#comment-14639598
 ] 

ASF GitHub Bot commented on FLINK-2105:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35378056
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
+   private Boolean it2Empty;
+
+   private OuterJoinType outerJoinType;
+
+
+   public AbstractMergeOuterJoinIterator(
+   OuterJoinType outerJoinType,
+   MutableObjectIterator input1,
+   MutableObjectIterator input2,
+   TypeSerializer serializer1, TypeComparator 
comparator1,
+   TypeSerializer serializer2, TypeComparator 
comparator2,
+   TypePairComparator pairComparator,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   int numMemoryPages,
+   AbstractInvokable parentTask)
+   throws MemoryAllocationException {
+   super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+
+   this.outerJoinType = outerJoinType;
+   }
+
+   /**
+* Calls the JoinFunction#match() method for all two 
key-value pairs that share the same key and come
+* from different inputs. Furthermore, depending on the outer join type 
(LEFT, RIGHT, FULL), all key-value pairs where no
+* matching partner from the other input exists are joined with null.
+* The output of the match() method is forwarded.
+*
+* @throws Exception Forwards all exceptions from the user code and the 
I/O system.
+* @see 
org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction,
 org.apache.flink.util.Collector)
+*/
+   @Override
+   public boolean callWithNextKey(final FlatJoinFunction 
matchFunction, final Collector collector) throws Exception {
+   if(it1Empty == null && it2Empty == null) {
+   //first run, set iterators to first elements
+   it1Empty = !this.iterator1.nextKey();
+   it2Empty = !this.iterator2.nextKey();
+   }
+
+   if (it1Empty && it2Empty) {
+   return false;
+   }
+   if (it2Empty) {
+   if(outerJoinType == OuterJoinType.LEFT || outerJoinType 
== OuterJoinType.FULL) {
+   
joinLeftKeyValuesWithNull(ite

[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639595#comment-14639595
 ] 

ASF GitHub Bot commented on FLINK-2105:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35377964
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
--- End diff --

why aren't you using the {{boolean}} primitive type?


> Implement Sort-Merge Outer Join algorithm
> -
>
> Key: FLINK-2105
> URL: https://issues.apache.org/jira/browse/FLINK-2105
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Ricky Pogalz
>Priority: Minor
> Fix For: pre-apache
>
>
> Flink does not natively support outer joins at the moment. 
> This issue proposes to implement a sort-merge outer join algorithm that can 
> cover left, right, and full outer joins.
> The implementation can be based on the regular sort-merge join iterator 
> ({{ReusingMergeMatchIterator}} and {{NonReusingMergeMatchIterator}}, see also 
> {{MatchDriver}} class)
> The Reusing and NonReusing variants differ in whether object instances are 
> reused or new objects are created. I would start with the NonReusing variant 
> which is safer from a user's point of view and should also be easier to 
> implement.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...

2015-07-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35377999
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
+   private Boolean it2Empty;
+
+   private OuterJoinType outerJoinType;
+
+
+   public AbstractMergeOuterJoinIterator(
+   OuterJoinType outerJoinType,
+   MutableObjectIterator input1,
+   MutableObjectIterator input2,
+   TypeSerializer serializer1, TypeComparator 
comparator1,
+   TypeSerializer serializer2, TypeComparator 
comparator2,
+   TypePairComparator pairComparator,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   int numMemoryPages,
+   AbstractInvokable parentTask)
+   throws MemoryAllocationException {
+   super(input1, input2, serializer1, comparator1, serializer2, 
comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, 
parentTask);
+
+   this.outerJoinType = outerJoinType;
--- End diff --

please initialize {{it1Empty}} and {{it2Empty}}.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...

2015-07-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/907#discussion_r35377964
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of the {@link 
org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the
+ * outer join through a sort-merge join strategy.
+ * */
+public abstract class AbstractMergeOuterJoinIterator extends 
AbstractMergeIterator {
+
+   public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+   private Boolean it1Empty;
--- End diff --

why aren't you using the {{boolean}} primitive type?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter

2015-07-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639545#comment-14639545
 ] 

Fabian Hueske commented on FLINK-2394:
--

Hadoop handles {{OutputCommitters}} differently for its both APIs ({{mapred}} 
and {{mapreduce}}).

- For the newer {{mapreduce}} API, things are quite simple.
{{org.apache.hadoop.mapreduce.OutputFormat}} has an abstract method 
{{getOutputCommitter(TaskAttemptContext context)}} which provides the output 
committer that must be used. So updating the Flink 
{{mapreduce.HadoopOutputFormatBase}} class such that it gets the 
{{OutputCommitter}} directly from the {{OutputFormat}} instead of creating a 
{{FileOutputCommitter}} should be easy.

- For the older {{mapred}} API, the OutputFormat interface does not define a 
{{getOutputCommitter()}} method. Instead the {{JobConf}} has a 
{{getOutputCommitter()}} method which returns a {{FileOutputCommitter}} if 
nothing else is defined. So we could update the 
{{mapred.HadoopOutputFormatBase}} to get the {{OutputCommitter}} from the 
{{JobConf}}. However, this would require that users manually set a different 
{{OutputCommitter}} in the {{JobConf}} which is not really intuitive. We could 
offer an additional constructor which sets a {{OutputCommitter}} in the 
provided {{JobConf}} argument, like [~stefano.bortoli] suggested.

> HadoopOutFormat OutputCommitter is default to FileOutputCommiter
> 
>
> Key: FLINK-2394
> URL: https://issues.apache.org/jira/browse/FLINK-2394
> Project: Flink
>  Issue Type: Bug
>  Components: Hadoop Compatibility
>Affects Versions: 0.9.0
>Reporter: Stefano Bortoli
>
> MongoOutputFormat does not write back in collection because the 
> HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and 
> is set as default to FileOutputCommitter. Therefore, on close and 
> globalFinalize execution the commit does not happen and mongo collection 
> stays untouched. 
> A simple solution would be to:
> 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat 
> that gets the OutputCommitter as a parameter
> 2 - change the outputCommitter field of HadoopOutputFormatBase to be a 
> generic OutputCommitter
> 3 - remove the default assignment in the open() and finalizeGlobal to the 
> outputCommitter to FileOutputCommitter(), or keep it as a default in case of 
> no specific assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2403) Rework partitioned state checkpointing logic

2015-07-23 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-2403:
-

 Summary: Rework partitioned state checkpointing logic
 Key: FLINK-2403
 URL: https://issues.apache.org/jira/browse/FLINK-2403
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Gyula Fora


In case of partitioned states, states are stored per-key (creating a 
statehandle for each key) so that we can later use this for repartitioning 
without fetching all the states. 

While this mechanism might be desired later it is extremely inefficient if we 
use a state backend such as HDFS as we are creating many small files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (FLINK-2152) Provide zipWithIndex utility in flink-contrib

2015-07-23 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-2152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes Günther reopened FLINK-2152:
-

When Using zip with index in a longer running job the following error occurs:


ava.lang.Exception: The user defined 'open()' method caused an exception: null
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:366)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.sort(ArrayList.java:1456)
at java.util.Collections.sort(Collections.java:175)
at 
org.apache.flink.api.java.utils.DataSetUtils$2.open(DataSetUtils.java:80)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:33)
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:492)
... 3 more
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:93)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The user defined 'open()' method caused an 
exception: null
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:366)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.sort(ArrayList.java:1456)
at java.util.Collections.sort(Collections.java:175)
at 
org.apache.flink.api.java.utils.DataSetUtils$2.open(DataSetUtils.java:80)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:33)
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:492)
... 3 more


This can be fixed by wrapping a concurrent list around the counts variable 
e.g. CopyOnWriteArrayList from java.util.concurrrent (in the open method)

> Provide zipWithIndex utility in flink-contrib
> -
>
> Key: FLINK-2152
> URL: https://issues.apache.org/jira/browse/FLINK-2152
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Robert Metzger
>Assignee: Andra Lungu
>Priority: Trivial
>  Labels: starter
> Fix For: 0.10
>
>
> We should provide a simple utility method for zipping elements in a data set 
> with a dense index.
> its up for discussion whether we want it directly in the API or if we should 
> provide it only as a utility from {{flink-contrib}}.
> I would put it in {{flink-contrib}}.
> See my answer on SO: 
> http://stackoverflow.com/questions/30596556/zipwithindex-on-apache-flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2391) Storm-compatibility:method FlinkTopologyBuilder.createTopology() throws java.lang.NullPointerException

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639389#comment-14639389
 ] 

ASF GitHub Bot commented on FLINK-2391:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/932#issuecomment-124218025
  
Your second commit reverted your first commit. I don't see any changes. 
"File changed 0".


> Storm-compatibility:method FlinkTopologyBuilder.createTopology() throws 
> java.lang.NullPointerException
> --
>
> Key: FLINK-2391
> URL: https://issues.apache.org/jira/browse/FLINK-2391
> Project: Flink
>  Issue Type: Bug
>  Components: flink-contrib
>Affects Versions: 0.10
> Environment: win7 32bit;linux
>Reporter: Huang Wei
>  Labels: features
> Fix For: 0.10
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> core dumped at FlinkOutputFieldsDeclarer.java : 160(package 
> FlinkOutputFieldsDeclarer).
> code : fieldIndexes[i] = this.outputSchema.fieldIndex(groupingFields.get(i));
> in this line, the var this.outputSchema may be null.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2391]Fix Storm-compatibility FlinkTopol...

2015-07-23 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/932#issuecomment-124218025
  
Your second commit reverted your first commit. I don't see any changes. 
"File changed 0".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2398) Decouple StreamGraph Building from the API

2015-07-23 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639107#comment-14639107
 ] 

Aljoscha Krettek commented on FLINK-2398:
-

But what would go into {{streaming-core}} and {{streaming-runtime}}?  
{{streaming-core}} is, right now, a (more or less) small layer on top of 
{{flink-core}} and {{flink-runtime}}. The separation is not very clear what 
would go where, IMHO.

> Decouple StreamGraph Building from the API
> --
>
> Key: FLINK-2398
> URL: https://issues.apache.org/jira/browse/FLINK-2398
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Currently, the building of the StreamGraph is very intertwined with the API 
> methods. DataStream knows about the StreamGraph and keeps track of splitting, 
> selected names, unions and so on. This leads to the problem that is is very 
> hard to understand how the StreamGraph is built because the code that does it 
> is all over the place. This also makes it hard to extend/change parts of the 
> Streaming system.
> I propose to introduce "Transformations". A transformation hold information 
> about one operation: The input streams, types, names, operator and so on. An 
> API method creates a transformation instead of fiddling with the StreamGraph 
> directly. A new component, the StreamGraphGenerator creates a StreamGraph 
> from the tree of transformations that result from program specification using 
> the API methods. This would relieve DataStream from knowing about the 
> StreamGraph and makes unions, splitting, selection visible transformations 
> instead of being scattered across the different API classes as fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2402) Add a non-blocking BarrierTracker

2015-07-23 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2402:
---

 Summary: Add a non-blocking BarrierTracker
 Key: FLINK-2402
 URL: https://issues.apache.org/jira/browse/FLINK-2402
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 0.10


This issue would add a new tracker for barriers that simply tracks what 
barriers have been observed from which inputs. It never blocks off buffers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2398) Decouple StreamGraph Building from the API

2015-07-23 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639094#comment-14639094
 ] 

Stephan Ewen commented on FLINK-2398:
-

What would be great down the line is the following:

  - Have the {{streaming-core} and {streaming-example}} independent of the 
runtime, much like for the batch API.

  - Add the runtime related streaming classes to {{flink-runtime}}.


> Decouple StreamGraph Building from the API
> --
>
> Key: FLINK-2398
> URL: https://issues.apache.org/jira/browse/FLINK-2398
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Currently, the building of the StreamGraph is very intertwined with the API 
> methods. DataStream knows about the StreamGraph and keeps track of splitting, 
> selected names, unions and so on. This leads to the problem that is is very 
> hard to understand how the StreamGraph is built because the code that does it 
> is all over the place. This also makes it hard to extend/change parts of the 
> Streaming system.
> I propose to introduce "Transformations". A transformation hold information 
> about one operation: The input streams, types, names, operator and so on. An 
> API method creates a transformation instead of fiddling with the StreamGraph 
> directly. A new component, the StreamGraphGenerator creates a StreamGraph 
> from the tree of transformations that result from program specification using 
> the API methods. This would relieve DataStream from knowing about the 
> StreamGraph and makes unions, splitting, selection visible transformations 
> instead of being scattered across the different API classes as fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2385) Scala DataSet.distinct should have parenthesis

2015-07-23 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2385.
-
Resolution: Fixed

Fixed via 5d475f88784f796e054d7cff2492b9989d3f47d6

> Scala DataSet.distinct should have parenthesis
> --
>
> Key: FLINK-2385
> URL: https://issues.apache.org/jira/browse/FLINK-2385
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> The method is not a side-effect free accessor, but defines heavy computation, 
> even if it does not mutate the original data set.
> This is a somewhat API breaking change.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2385) Scala DataSet.distinct should have parenthesis

2015-07-23 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2385.
---

> Scala DataSet.distinct should have parenthesis
> --
>
> Key: FLINK-2385
> URL: https://issues.apache.org/jira/browse/FLINK-2385
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> The method is not a side-effect free accessor, but defines heavy computation, 
> even if it does not mutate the original data set.
> This is a somewhat API breaking change.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2385) Scala DataSet.distinct should have parenthesis

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639041#comment-14639041
 ] 

ASF GitHub Bot commented on FLINK-2385:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/933


> Scala DataSet.distinct should have parenthesis
> --
>
> Key: FLINK-2385
> URL: https://issues.apache.org/jira/browse/FLINK-2385
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> The method is not a side-effect free accessor, but defines heavy computation, 
> even if it does not mutate the original data set.
> This is a somewhat API breaking change.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2385] [scala api] [api breaking] Add pa...

2015-07-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/933


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2401) Replace ActorRefs with ActorGateway in web server

2015-07-23 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-2401:


 Summary: Replace ActorRefs with ActorGateway in web server
 Key: FLINK-2401
 URL: https://issues.apache.org/jira/browse/FLINK-2401
 Project: Flink
  Issue Type: Improvement
Reporter: Till Rohrmann
Priority: Minor


The web server is the only remaining component which uses {{ActorRefs}} 
directly to communicate with Flink actors. They should be replaced by 
{{ActorGateways}} which allow the automatic decoration of messages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2332) Assign session IDs to JobManager and TaskManager messages

2015-07-23 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-2332.

Resolution: Fixed

Added via 45428518d0e1b843947a6184b4a803a78ad5

> Assign session IDs to JobManager and TaskManager messages
> -
>
> Key: FLINK-2332
> URL: https://issues.apache.org/jira/browse/FLINK-2332
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 0.10
>
>
> In order to support true high availability {{TaskManager}} and {{JobManager}} 
> have to be able to distinguish whether a message was sent from the leader or 
> whether a message was sent from a former leader. Messages which come from a 
> former leader have to be discarded in order to guarantee a consistent state.
> A way to do achieve this is to assign a leader session ID to a {{JobManager}} 
> once he's elected as leader. This leader session ID is sent to the 
> {{TaskManager}} upon registration at the {{JobManager}}. All subsequent 
> messages should then be decorated with this leader session ID to mark them as 
> valid. On the {{TaskManager}} side the received leader session ID as a 
> response to the registration message, can then be used to validate incoming 
> messages.
> The same holds true for registration messages which should have a 
> registration session ID, too. That way, it is possible to distinguish invalid 
> registration messages from valid ones. The registration session ID can be 
> assigned once the TaskManager is notified about the new leader.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/917


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2332) Assign session IDs to JobManager and TaskManager messages

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639028#comment-14639028
 ] 

ASF GitHub Bot commented on FLINK-2332:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/917


> Assign session IDs to JobManager and TaskManager messages
> -
>
> Key: FLINK-2332
> URL: https://issues.apache.org/jira/browse/FLINK-2332
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 0.10
>
>
> In order to support true high availability {{TaskManager}} and {{JobManager}} 
> have to be able to distinguish whether a message was sent from the leader or 
> whether a message was sent from a former leader. Messages which come from a 
> former leader have to be discarded in order to guarantee a consistent state.
> A way to do achieve this is to assign a leader session ID to a {{JobManager}} 
> once he's elected as leader. This leader session ID is sent to the 
> {{TaskManager}} upon registration at the {{JobManager}}. All subsequent 
> messages should then be decorated with this leader session ID to mark them as 
> valid. On the {{TaskManager}} side the received leader session ID as a 
> response to the registration message, can then be used to validate incoming 
> messages.
> The same holds true for registration messages which should have a 
> registration session ID, too. That way, it is possible to distinguish invalid 
> registration messages from valid ones. The registration session ID can be 
> assigned once the TaskManager is notified about the new leader.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-23 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/917#issuecomment-124143378
  
Tests pass on Travis. Will merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2332) Assign session IDs to JobManager and TaskManager messages

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639018#comment-14639018
 ] 

ASF GitHub Bot commented on FLINK-2332:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/917#issuecomment-124143378
  
Tests pass on Travis. Will merge it.


> Assign session IDs to JobManager and TaskManager messages
> -
>
> Key: FLINK-2332
> URL: https://issues.apache.org/jira/browse/FLINK-2332
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 0.10
>
>
> In order to support true high availability {{TaskManager}} and {{JobManager}} 
> have to be able to distinguish whether a message was sent from the leader or 
> whether a message was sent from a former leader. Messages which come from a 
> former leader have to be discarded in order to guarantee a consistent state.
> A way to do achieve this is to assign a leader session ID to a {{JobManager}} 
> once he's elected as leader. This leader session ID is sent to the 
> {{TaskManager}} upon registration at the {{JobManager}}. All subsequent 
> messages should then be decorated with this leader session ID to mark them as 
> valid. On the {{TaskManager}} side the received leader session ID as a 
> response to the registration message, can then be used to validate incoming 
> messages.
> The same holds true for registration messages which should have a 
> registration session ID, too. That way, it is possible to distinguish invalid 
> registration messages from valid ones. The registration session ID can be 
> assigned once the TaskManager is notified about the new leader.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests

2015-07-23 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638985#comment-14638985
 ] 

Till Rohrmann commented on FLINK-2392:
--

The same happened to me: 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/72303347/log.txt

> Instable test in flink-yarn-tests
> -
>
> Key: FLINK-2392
> URL: https://issues.apache.org/jira/browse/FLINK-2392
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Matthias J. Sax
>Priority: Minor
>
> The test YARNSessionFIFOITCase fails from time to time on an irregular basis. 
> For example see: https://travis-ci.org/apache/flink/jobs/72019690
> Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) 
>  Time elapsed: 60.651 sec  <<< FAILURE!
> java.lang.AssertionError: During the timeout period of 60 seconds the 
> expected string did not show up
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435)
> Results :
> Failed tests: 
>   
> YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478
>  During the timeout period of 60 seconds the expected string did not show up



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2398) Decouple StreamGraph Building from the API

2015-07-23 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638935#comment-14638935
 ] 

Aljoscha Krettek commented on FLINK-2398:
-

Yes, I think this is a prerequisite for several things we're working on.

I've started fiddling around with this and I have it working except for 
split/select and iterations. I think the first step is to remove the dependency 
on StreamGraph from DataStream and the other API classes. Once this is done we 
can change the representation to make it work for both stream and batch.

In the code I have right now, the transform() method of DataStream simply 
creates a OneInputTransformation, transform() on ConnectedDataStream creates a 
TwoInputTransformation, union creates a UnionTransformation, and so on...

> Decouple StreamGraph Building from the API
> --
>
> Key: FLINK-2398
> URL: https://issues.apache.org/jira/browse/FLINK-2398
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Currently, the building of the StreamGraph is very intertwined with the API 
> methods. DataStream knows about the StreamGraph and keeps track of splitting, 
> selected names, unions and so on. This leads to the problem that is is very 
> hard to understand how the StreamGraph is built because the code that does it 
> is all over the place. This also makes it hard to extend/change parts of the 
> Streaming system.
> I propose to introduce "Transformations". A transformation hold information 
> about one operation: The input streams, types, names, operator and so on. An 
> API method creates a transformation instead of fiddling with the StreamGraph 
> directly. A new component, the StreamGraphGenerator creates a StreamGraph 
> from the tree of transformations that result from program specification using 
> the API methods. This would relieve DataStream from knowing about the 
> StreamGraph and makes unions, splitting, selection visible transformations 
> instead of being scattered across the different API classes as fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2400) TaskTest.testExecutionFailesAfterCanceling fails

2015-07-23 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi resolved FLINK-2400.

Resolution: Later

Changed the error message via fa78be6. We have to check whether this occurs 
again or not.

> TaskTest.testExecutionFailesAfterCanceling fails
> 
>
> Key: FLINK-2400
> URL: https://issues.apache.org/jira/browse/FLINK-2400
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.9, master
>Reporter: Ufuk Celebi
>Priority: Minor
> Fix For: 0.10
>
>
> The following test failed on Travis:
> TaskTest.testExecutionFailesAfterCanceling:561->validateUnregisterTask:774 
> TaskManager message is not 'UnregisterTask'
> https://travis-ci.org/uce/flink/jobs/72292328
> I think it's some flakeyness in the test. It's the first time I saw this 
> failing. From the logs, I couldn't tell what went wrong. In the test there is 
> a timeout for the expected message to arrive. Maybe that's the issue, 
> [~StephanEwen]?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2400) TaskTest.testExecutionFailesAfterCanceling fails

2015-07-23 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638913#comment-14638913
 ] 

Stephan Ewen commented on FLINK-2400:
-

Would that error be triggered on timeout?

The first step should be to make the error better. Log if it is a timeout, or 
what message it was, if it is an unexpected message.

> TaskTest.testExecutionFailesAfterCanceling fails
> 
>
> Key: FLINK-2400
> URL: https://issues.apache.org/jira/browse/FLINK-2400
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.9, master
>Reporter: Ufuk Celebi
>Priority: Minor
> Fix For: 0.10
>
>
> The following test failed on Travis:
> TaskTest.testExecutionFailesAfterCanceling:561->validateUnregisterTask:774 
> TaskManager message is not 'UnregisterTask'
> https://travis-ci.org/uce/flink/jobs/72292328
> I think it's some flakeyness in the test. It's the first time I saw this 
> failing. From the logs, I couldn't tell what went wrong. In the test there is 
> a timeout for the expected message to arrive. Maybe that's the issue, 
> [~StephanEwen]?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2400) TaskTest.testExecutionFailesAfterCanceling fails

2015-07-23 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-2400:
--

 Summary: TaskTest.testExecutionFailesAfterCanceling fails
 Key: FLINK-2400
 URL: https://issues.apache.org/jira/browse/FLINK-2400
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 0.9, master
Reporter: Ufuk Celebi
Priority: Minor
 Fix For: 0.10


The following test failed on Travis:
TaskTest.testExecutionFailesAfterCanceling:561->validateUnregisterTask:774 
TaskManager message is not 'UnregisterTask'

https://travis-ci.org/uce/flink/jobs/72292328

I think it's some flakeyness in the test. It's the first time I saw this 
failing. From the logs, I couldn't tell what went wrong. In the test there is a 
timeout for the expected message to arrive. Maybe that's the issue, 
[~StephanEwen]?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2397) Unify two backend servers to one server

2015-07-23 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2397.
-
Resolution: Fixed

Fixed via c52e753a8d3fc15ed48df7bfa24a327a90df9a0f

> Unify two backend servers to one server
> ---
>
> Key: FLINK-2397
> URL: https://issues.apache.org/jira/browse/FLINK-2397
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> Currently, the new dashboard still needs both the old backend server and the 
> new backend server. We need to migrate the requests from {{/jobsInfo}} to the 
> new server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2397) Unify two backend servers to one server

2015-07-23 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2397.
---

> Unify two backend servers to one server
> ---
>
> Key: FLINK-2397
> URL: https://issues.apache.org/jira/browse/FLINK-2397
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> Currently, the new dashboard still needs both the old backend server and the 
> new backend server. We need to migrate the requests from {{/jobsInfo}} to the 
> new server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2398) Decouple StreamGraph Building from the API

2015-07-23 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638891#comment-14638891
 ] 

Gyula Fora commented on FLINK-2398:
---

I suggest we design the API in a way that it will make it possible to unify 
batch and streaming programs in the future. So we should pick a representation 
that will work for both so we can modify the batch api later. 
(this will probably mean the rewriting of the StreamGraph into some common 
representation at some point)

> Decouple StreamGraph Building from the API
> --
>
> Key: FLINK-2398
> URL: https://issues.apache.org/jira/browse/FLINK-2398
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Currently, the building of the StreamGraph is very intertwined with the API 
> methods. DataStream knows about the StreamGraph and keeps track of splitting, 
> selected names, unions and so on. This leads to the problem that is is very 
> hard to understand how the StreamGraph is built because the code that does it 
> is all over the place. This also makes it hard to extend/change parts of the 
> Streaming system.
> I propose to introduce "Transformations". A transformation hold information 
> about one operation: The input streams, types, names, operator and so on. An 
> API method creates a transformation instead of fiddling with the StreamGraph 
> directly. A new component, the StreamGraphGenerator creates a StreamGraph 
> from the tree of transformations that result from program specification using 
> the API methods. This would relieve DataStream from knowing about the 
> StreamGraph and makes unions, splitting, selection visible transformations 
> instead of being scattered across the different API classes as fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2399) Fail when actor versions don't match

2015-07-23 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-2399:
--

 Summary: Fail when actor versions don't match
 Key: FLINK-2399
 URL: https://issues.apache.org/jira/browse/FLINK-2399
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, TaskManager
Affects Versions: 0.9, master
Reporter: Ufuk Celebi
Priority: Minor
 Fix For: 0.10


Problem: there can be subtle errors when actors from different Flink versions 
communicate with each other, for example when an old client (e.g. Flink 0.9) 
communicates with a new JobManager (e.g. Flink 0.10-SNAPSHOT).

We can check that the versions match on first communication between the actors 
and fail if they don't match.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2205) Confusing entries in JM Webfrontend Job Configuration section

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638815#comment-14638815
 ] 

ASF GitHub Bot commented on FLINK-2205:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/927#issuecomment-124109471
  
I think you can actually add the code to the current UI. It is not going to 
disappear in the near future, it will probably stay as a fallback variant for 
one more version.

You can add this functionality there. Adding it to the new web frontend is 
a separate effort. If you want, you can be involved there as well.


> Confusing entries in JM Webfrontend Job Configuration section
> -
>
> Key: FLINK-2205
> URL: https://issues.apache.org/jira/browse/FLINK-2205
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Priority: Minor
>  Labels: starter
>
> The Job Configuration section of the job history / analyze page of the 
> JobManager webinterface contains two confusing entries:
> - {{Number of execution retries}} is actually the maximum number of retries 
> and should be renamed accordingly. The default value is -1 and should be 
> changed to "deactivated" (or 0).
> - {{Job parallelism}} which is -1 by default. A parallelism of -1 is not very 
> meaningful. It would be better to show something like "auto"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2205] Fix confusing entries in JM UI Jo...

2015-07-23 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/927#issuecomment-124109471
  
I think you can actually add the code to the current UI. It is not going to 
disappear in the near future, it will probably stay as a fallback variant for 
one more version.

You can add this functionality there. Adding it to the new web frontend is 
a separate effort. If you want, you can be involved there as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2398) Decouple StreamGraph Building from the API

2015-07-23 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-2398:
---

 Summary: Decouple StreamGraph Building from the API
 Key: FLINK-2398
 URL: https://issues.apache.org/jira/browse/FLINK-2398
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


Currently, the building of the StreamGraph is very intertwined with the API 
methods. DataStream knows about the StreamGraph and keeps track of splitting, 
selected names, unions and so on. This leads to the problem that is is very 
hard to understand how the StreamGraph is built because the code that does it 
is all over the place. This also makes it hard to extend/change parts of the 
Streaming system.

I propose to introduce "Transformations". A transformation hold information 
about one operation: The input streams, types, names, operator and so on. An 
API method creates a transformation instead of fiddling with the StreamGraph 
directly. A new component, the StreamGraphGenerator creates a StreamGraph from 
the tree of transformations that result from program specification using the 
API methods. This would relieve DataStream from knowing about the StreamGraph 
and makes unions, splitting, selection visible transformations instead of being 
scattered across the different API classes as fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2385) Scala DataSet.distinct should have parenthesis

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638766#comment-14638766
 ] 

ASF GitHub Bot commented on FLINK-2385:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/933#issuecomment-124088349
  
Nope. +1 from my side.


> Scala DataSet.distinct should have parenthesis
> --
>
> Key: FLINK-2385
> URL: https://issues.apache.org/jira/browse/FLINK-2385
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> The method is not a side-effect free accessor, but defines heavy computation, 
> even if it does not mutate the original data set.
> This is a somewhat API breaking change.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2385] [scala api] [api breaking] Add pa...

2015-07-23 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/933#issuecomment-124088349
  
Nope. +1 from my side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2385) Scala DataSet.distinct should have parenthesis

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638757#comment-14638757
 ] 

ASF GitHub Bot commented on FLINK-2385:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/933#issuecomment-124086584
  
Any reservation against this?

(The one failed build profile is due to a Travis hiccup)


> Scala DataSet.distinct should have parenthesis
> --
>
> Key: FLINK-2385
> URL: https://issues.apache.org/jira/browse/FLINK-2385
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> The method is not a side-effect free accessor, but defines heavy computation, 
> even if it does not mutate the original data set.
> This is a somewhat API breaking change.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2385] [scala api] [api breaking] Add pa...

2015-07-23 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/933#issuecomment-124086584
  
Any reservation against this?

(The one failed build profile is due to a Travis hiccup)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2397) Unify two backend servers to one server

2015-07-23 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2397:
---

 Summary: Unify two backend servers to one server
 Key: FLINK-2397
 URL: https://issues.apache.org/jira/browse/FLINK-2397
 Project: Flink
  Issue Type: Sub-task
  Components: Webfrontend
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 0.10


Currently, the new dashboard still needs both the old backend server and the 
new backend server. We need to migrate the requests from {{/jobsInfo}} to the 
new server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2389) Add dashboard frontend architecture amd build infrastructue

2015-07-23 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2389.
---

> Add dashboard frontend architecture amd build infrastructue
> ---
>
> Key: FLINK-2389
> URL: https://issues.apache.org/jira/browse/FLINK-2389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> Add the build infrastructure and basic libraries for a modern and modular web 
> dashboard.
> The dashboard is going to be built using angular.js.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-23 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638617#comment-14638617
 ] 

Chengxiang Li commented on FLINK-1901:
--

Thanks for the analysis, [~trohrm...@apache.org], I've created FLINK-2396 as a 
followup work to support sample and other similar datasets in iteration.

> Create sample operator for Dataset
> --
>
> Key: FLINK-1901
> URL: https://issues.apache.org/jira/browse/FLINK-1901
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Theodore Vasiloudis
>Assignee: Chengxiang Li
>
> In order to be able to implement Stochastic Gradient Descent and a number of 
> other machine learning algorithms we need to have a way to take a random 
> sample from a Dataset.
> We need to be able to sample with or without replacement from the Dataset, 
> choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2341) Deadlock in SpilledSubpartitionViewAsyncIO

2015-07-23 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638613#comment-14638613
 ] 

Ufuk Celebi commented on FLINK-2341:


I think this is an issue with the test.

In general, I think that the asynchronous reader variants were premature in the 
first place (I did this). They add quite some complexity and their merit is 
unmeasured performance-wise. We will probably not recommend anyone to use this 
variant at the moment.

I am thinking about whether it is better to just remove the async reader 
variants and take it up only if it becomes necessary.

> Deadlock in SpilledSubpartitionViewAsyncIO
> --
>
> Key: FLINK-2341
> URL: https://issues.apache.org/jira/browse/FLINK-2341
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Affects Versions: 0.9, 0.10
>Reporter: Stephan Ewen
>Assignee: Ufuk Celebi
>Priority: Critical
> Fix For: 0.9, 0.10
>
>
> It may be that the deadlock is because of the way the 
> {{SpilledSubpartitionViewTest}} is written
> {code}
> Found one Java-level deadlock:
> =
> "pool-25-thread-2":
>   waiting to lock monitor 0x7f66f4932468 (object 0xfa1478f0, a 
> java.lang.Object),
>   which is held by "IOManager reader thread #1"
> "IOManager reader thread #1":
>   waiting to lock monitor 0x7f66f4931160 (object 0xfa029768, a 
> java.lang.Object),
>   which is held by "pool-25-thread-2"
> Java stack information for the threads listed above:
> ===
> "pool-25-thread-2":
>   at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.notifyError(SpilledSubpartitionViewAsyncIO.java:304)
>   - waiting to lock <0xfa1478f0> (a java.lang.Object)
>   at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.onAvailableBuffer(SpilledSubpartitionViewAsyncIO.java:256)
>   at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$300(SpilledSubpartitionViewAsyncIO.java:42)
>   at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:367)
>   at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:353)
>   at 
> org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:135)
>   - locked <0xfa029768> (a java.lang.Object)
>   at 
> org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:119)
>   - locked <0xfa3a1a20> (a java.lang.Object)
>   at 
> org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:95)
>   at 
> org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:39)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:701)
> "IOManager reader thread #1":
>   at 
> org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:127)
>   - waiting to lock <0xfa029768> (a java.lang.Object)
>   at 
> org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:119)
>   - locked <0xfa3a1ea0> (a java.lang.Object)
>   at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.returnBufferFromIOThread(SpilledSubpartitionViewAsyncIO.java:270)
>   - locked <0xfa1478f0> (a java.lang.Object)
>   at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$100(SpilledSubpartitionViewAsyncIO.java:42)
>   at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:338)
>   at 
> org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:328)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.handleProcessedBuffer(AsynchronousFileIOChannel.java:199)
>   at 
> org.apache.flink.runtime.io.disk.iomanager.BufferReadRequest.requestDone(AsynchronousFileIOChannel.java:431)
>   at 
> org.apache.flink.r

[jira] [Created] (FLINK-2396) Review the datasets of dynamic path and static path in iteration.

2015-07-23 Thread Chengxiang Li (JIRA)
Chengxiang Li created FLINK-2396:


 Summary: Review the datasets of dynamic path and static path in 
iteration.
 Key: FLINK-2396
 URL: https://issues.apache.org/jira/browse/FLINK-2396
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Chengxiang Li
Priority: Minor


Currently Flink would cached dataset in static path as it assumes that dataset 
stay the same during the iteration, but this assumption does not always be 
true. Take sampling for example, the iteration data set is something like the 
weight vector of model and there is another training dataset from which to take 
a small sample to update the weight vector in each iteration (e.g. Stochastic 
Gradient Descent), we expect sampled dataset is different in each iteration, 
but Flink would cache the sampled dataset as it in static path. 
We should review how Flink identify dynamic path and static path, and support 
add sampled dataset in above example to dynamic path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2395) Check Scala catch blocks for Throwable

2015-07-23 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638609#comment-14638609
 ] 

Maximilian Michels commented on FLINK-2395:
---

On the master, we currently have 72 {{Throwable}} catch blocks in the Scala 
code. Probably most of them can be safely converted to catch {{Exception}} 
instead. Some corner cases may remain, e.g. Hadoop uses {{FSError}} subclassed 
from Throwable to signalize file system errors but that should be handled by 
Java code. Then of course there are virtual machine errors like 
{{OutOfMemoryError}}.

> Check Scala catch blocks for Throwable
> --
>
> Key: FLINK-2395
> URL: https://issues.apache.org/jira/browse/FLINK-2395
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Priority: Minor
>
> As described in [1], it's not a good practice to catch {{Throwables}} in 
> Scala catch blocks because Scala uses some special exceptions for the control 
> flow. Therefore we should check whether we can restrict ourselves to only 
> catching subtypes of {{Throwable}}, such as {{Exception}}, instead.
> [1] 
> https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2213) Configure number of vcores

2015-07-23 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi updated FLINK-2213:
---
Assignee: (was: Ufuk Celebi)

> Configure number of vcores
> --
>
> Key: FLINK-2213
> URL: https://issues.apache.org/jira/browse/FLINK-2213
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN Client
>Affects Versions: master
>Reporter: Ufuk Celebi
>
> Currently, the number of vcores per YARN container is set to 1.
> It is desirable to allow configuring this value. As a simple heuristic it 
> makes sense to at least set it to the number of slots per container.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2395) Check Scala catch blocks for Throwable

2015-07-23 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638608#comment-14638608
 ] 

Stephan Ewen commented on FLINK-2395:
-

At some points, catch {{Throwable}} is probably okay, such as at root points in 
the JobManager / TaskManager process initialization.

> Check Scala catch blocks for Throwable
> --
>
> Key: FLINK-2395
> URL: https://issues.apache.org/jira/browse/FLINK-2395
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Priority: Minor
>
> As described in [1], it's not a good practice to catch {{Throwables}} in 
> Scala catch blocks because Scala uses some special exceptions for the control 
> flow. Therefore we should check whether we can restrict ourselves to only 
> catching subtypes of {{Throwable}}, such as {{Exception}}, instead.
> [1] 
> https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-1863) RemoteInputChannelTest.testConcurrentOnBufferAndRelease fails on travis

2015-07-23 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi resolved FLINK-1863.

Resolution: Cannot Reproduce

> RemoteInputChannelTest.testConcurrentOnBufferAndRelease fails on travis
> ---
>
> Key: FLINK-1863
> URL: https://issues.apache.org/jira/browse/FLINK-1863
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Ufuk Celebi
>
> {code}
> testConcurrentOnBufferAndRelease(org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest)
>   Time elapsed: 120.022 sec  <<< ERROR!
> java.lang.Exception: test timed out after 12 milliseconds
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:248)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:111)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.testConcurrentOnBufferAndRelease(RemoteInputChannelTest.java:124)
> {code}
> This is the build: 
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/57943450/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2395) Check Scala catch blocks for Throwable

2015-07-23 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638603#comment-14638603
 ] 

Ufuk Celebi commented on FLINK-2395:


Interesting link. Thanks for sharing!

> Check Scala catch blocks for Throwable
> --
>
> Key: FLINK-2395
> URL: https://issues.apache.org/jira/browse/FLINK-2395
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Priority: Minor
>
> As described in [1], it's not a good practice to catch {{Throwables}} in 
> Scala catch blocks because Scala uses some special exceptions for the control 
> flow. Therefore we should check whether we can restrict ourselves to only 
> catching subtypes of {{Throwable}}, such as {{Exception}}, instead.
> [1] 
> https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2332) Assign session IDs to JobManager and TaskManager messages

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638589#comment-14638589
 ] 

ASF GitHub Bot commented on FLINK-2332:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/917#issuecomment-124038143
  
Will rebase and then merge this PR.


> Assign session IDs to JobManager and TaskManager messages
> -
>
> Key: FLINK-2332
> URL: https://issues.apache.org/jira/browse/FLINK-2332
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 0.10
>
>
> In order to support true high availability {{TaskManager}} and {{JobManager}} 
> have to be able to distinguish whether a message was sent from the leader or 
> whether a message was sent from a former leader. Messages which come from a 
> former leader have to be discarded in order to guarantee a consistent state.
> A way to do achieve this is to assign a leader session ID to a {{JobManager}} 
> once he's elected as leader. This leader session ID is sent to the 
> {{TaskManager}} upon registration at the {{JobManager}}. All subsequent 
> messages should then be decorated with this leader session ID to mark them as 
> valid. On the {{TaskManager}} side the received leader session ID as a 
> response to the registration message, can then be used to validate incoming 
> messages.
> The same holds true for registration messages which should have a 
> registration session ID, too. That way, it is possible to distinguish invalid 
> registration messages from valid ones. The registration session ID can be 
> assigned once the TaskManager is notified about the new leader.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-23 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/917#issuecomment-124038143
  
Will rebase and then merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2395) Check Scala catch blocks for Throwable

2015-07-23 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-2395:


 Summary: Check Scala catch blocks for Throwable
 Key: FLINK-2395
 URL: https://issues.apache.org/jira/browse/FLINK-2395
 Project: Flink
  Issue Type: Improvement
Reporter: Till Rohrmann
Priority: Minor


As described in [1], it's not a good practice to catch {{Throwables}} in Scala 
catch blocks because Scala uses some special exceptions for the control flow. 
Therefore we should check whether we can restrict ourselves to only catching 
subtypes of {{Throwable}}, such as {{Exception}}, instead.

[1] 
https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2373) Add configuration parameter to createRemoteEnvironment method

2015-07-23 Thread Andreas Kunft (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638525#comment-14638525
 ] 

Andreas Kunft commented on FLINK-2373:
--

I will

> Add configuration parameter to createRemoteEnvironment method
> -
>
> Key: FLINK-2373
> URL: https://issues.apache.org/jira/browse/FLINK-2373
> Project: Flink
>  Issue Type: Bug
>  Components: other
>Reporter: Andreas Kunft
>Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently there is no way to provide a custom configuration upon creation of 
> a remote environment (via ExecutionEnvironment.createRemoteEnvironment(...)).
> This leads to errors when the submitted job exceeds the default value for the 
> max. payload size in Akka, as we can not increase the configuration value 
> (akka.remote.OversizedPayloadException: Discarding oversized payload...)
> Providing an overloaded method with a configuration parameter for the remote 
> environment fixes that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2205] Fix confusing entries in JM UI Jo...

2015-07-23 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/927#issuecomment-124019229
  
@ebautistabar you're the only one who can close the PR. But maybe the new 
web interface needs the same fixes as you've proposed for the old UI.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2205) Confusing entries in JM Webfrontend Job Configuration section

2015-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638458#comment-14638458
 ] 

ASF GitHub Bot commented on FLINK-2205:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/927#issuecomment-124019229
  
@ebautistabar you're the only one who can close the PR. But maybe the new 
web interface needs the same fixes as you've proposed for the old UI.


> Confusing entries in JM Webfrontend Job Configuration section
> -
>
> Key: FLINK-2205
> URL: https://issues.apache.org/jira/browse/FLINK-2205
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Priority: Minor
>  Labels: starter
>
> The Job Configuration section of the job history / analyze page of the 
> JobManager webinterface contains two confusing entries:
> - {{Number of execution retries}} is actually the maximum number of retries 
> and should be renamed accordingly. The default value is -1 and should be 
> changed to "deactivated" (or 0).
> - {{Job parallelism}} which is -1 by default. A parallelism of -1 is not very 
> meaningful. It would be better to show something like "auto"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-23 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638448#comment-14638448
 ] 

Till Rohrmann commented on FLINK-1901:
--

Currently, whats happening to decide whether an operator is on a dynamic path 
or not is to look at the inputs of the operator. If they are dynamic so is the 
current operator. The iteration {{DataSets}}, {{WorksetPlaceHolder}}, 
{{SolutionSetPlaceHolder}} and {{PartialSolutionPlaceHolder}}, are always 
dynamic. What could be an idea is to allow other operators also to be declared 
dynamic. That way they can also start dynamic path. Afterwards, we have to make 
sure that not only the iteration {{DataSets}} get a {{IterationHead}} 
prepended, which kicks off the iterations, but also all the other operators 
which start a dynamic path. 

> Create sample operator for Dataset
> --
>
> Key: FLINK-1901
> URL: https://issues.apache.org/jira/browse/FLINK-1901
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Theodore Vasiloudis
>Assignee: Chengxiang Li
>
> In order to be able to implement Stochastic Gradient Descent and a number of 
> other machine learning algorithms we need to have a way to take a random 
> sample from a Dataset.
> We need to be able to sample with or without replacement from the Dataset, 
> choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-23 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638429#comment-14638429
 ] 

Till Rohrmann commented on FLINK-1901:
--

That's a good idea to break down the task. Do you want to take the lead 
[~chengxiang li]?

> Create sample operator for Dataset
> --
>
> Key: FLINK-1901
> URL: https://issues.apache.org/jira/browse/FLINK-1901
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Theodore Vasiloudis
>Assignee: Chengxiang Li
>
> In order to be able to implement Stochastic Gradient Descent and a number of 
> other machine learning algorithms we need to have a way to take a random 
> sample from a Dataset.
> We need to be able to sample with or without replacement from the Dataset, 
> choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)