[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter
[ https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14640007#comment-14640007 ] Stefano Bortoli commented on FLINK-2394: I see. I was in fact a little disappointed with mongo-hadoop for having a method getOutputCommitter() which was not standard in the OutputFormat. However, my first implementation was using the mapred Hadoop, so no big surprises. I would say that a good idea could be to simply have either 2 HadoopOutputFormatBase classes (1 per version of Hadoop), or handle the hadoop version to get the OutputCommitter accordingly. Meanwhile, I have implemented my own MongoHadoopOutputFormat extending the HadoopOutputFormat and overriding the open and close methods replacing the FileOutputCommitter with the MongoOutputCommiter. > HadoopOutFormat OutputCommitter is default to FileOutputCommiter > > > Key: FLINK-2394 > URL: https://issues.apache.org/jira/browse/FLINK-2394 > Project: Flink > Issue Type: Bug > Components: Hadoop Compatibility >Affects Versions: 0.9.0 >Reporter: Stefano Bortoli > > MongoOutputFormat does not write back in collection because the > HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and > is set as default to FileOutputCommitter. Therefore, on close and > globalFinalize execution the commit does not happen and mongo collection > stays untouched. > A simple solution would be to: > 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat > that gets the OutputCommitter as a parameter > 2 - change the outputCommitter field of HadoopOutputFormatBase to be a > generic OutputCommitter > 3 - remove the default assignment in the open() and finalizeGlobal to the > outputCommitter to FileOutputCommitter(), or keep it as a default in case of > no specific assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2106) Add outer joins to API, Optimizer, and Runtime
[ https://issues.apache.org/jira/browse/FLINK-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639686#comment-14639686 ] Fabian Hueske commented on FLINK-2106: -- Hi [~jkovacs] and [~r-pogalz], I had a look at your PR for FLINK-2105. Looks very good to me. We should run a few more tests, but after that I am confident that we can merge the PR. I'm assigning this issue to you. > Add outer joins to API, Optimizer, and Runtime > -- > > Key: FLINK-2106 > URL: https://issues.apache.org/jira/browse/FLINK-2106 > Project: Flink > Issue Type: Sub-task > Components: Java API, Local Runtime, Optimizer, Scala API >Reporter: Fabian Hueske >Assignee: Ricky Pogalz >Priority: Minor > Fix For: pre-apache > > > Add left/right/full outer join methods to the DataSet APIs (Java, Scala), to > the optimizer, and the runtime of Flink. > Initially, the execution strategy should be a sort-merge outer join > (FLINK-2105) but can later be extended to hash joins for left/right outer > joins. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2106) Add outer joins to API, Optimizer, and Runtime
[ https://issues.apache.org/jira/browse/FLINK-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-2106: - Assignee: Ricky Pogalz > Add outer joins to API, Optimizer, and Runtime > -- > > Key: FLINK-2106 > URL: https://issues.apache.org/jira/browse/FLINK-2106 > Project: Flink > Issue Type: Sub-task > Components: Java API, Local Runtime, Optimizer, Scala API >Reporter: Fabian Hueske >Assignee: Ricky Pogalz >Priority: Minor > Fix For: pre-apache > > > Add left/right/full outer join methods to the DataSet APIs (Java, Scala), to > the optimizer, and the runtime of Flink. > Initially, the execution strategy should be a sort-merge outer join > (FLINK-2105) but can later be extended to hash joins for left/right outer > joins. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm
[ https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639681#comment-14639681 ] ASF GitHub Bot commented on FLINK-2105: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/907#issuecomment-124270766 Very nice work! :+1: I had just a few minor style regards. It would be nice if you could check the overhead of having all outer join cases handled by the same class vs. having a dedicated class for LEFT, RIGHT, and FULL OUTER. I would also like to run a major regression benchmark to double-check that the refactoring of the current sort-merge join didn't cause a performance regression. I can take care of that once you addressed the comments. If all lights are green after that, I would be happy to merge this PR. > Implement Sort-Merge Outer Join algorithm > - > > Key: FLINK-2105 > URL: https://issues.apache.org/jira/browse/FLINK-2105 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Ricky Pogalz >Priority: Minor > Fix For: pre-apache > > > Flink does not natively support outer joins at the moment. > This issue proposes to implement a sort-merge outer join algorithm that can > cover left, right, and full outer joins. > The implementation can be based on the regular sort-merge join iterator > ({{ReusingMergeMatchIterator}} and {{NonReusingMergeMatchIterator}}, see also > {{MatchDriver}} class) > The Reusing and NonReusing variants differ in whether object instances are > reused or new objects are created. I would start with the NonReusing variant > which is safer from a user's point of view and should also be easier to > implement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/907#issuecomment-124270766 Very nice work! :+1: I had just a few minor style regards. It would be nice if you could check the overhead of having all outer join cases handled by the same class vs. having a dedicated class for LEFT, RIGHT, and FULL OUTER. I would also like to run a major regression benchmark to double-check that the refactoring of the current sort-merge join didn't cause a performance regression. I can take care of that once you addressed the comments. If all lights are green after that, I would be happy to merge this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35384117 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java --- @@ -0,0 +1,587 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.record.RecordComparator; +import org.apache.flink.api.common.typeutils.record.RecordPairComparator; +import org.apache.flink.api.common.typeutils.record.RecordSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType; +import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.operators.testutils.TestData; +import org.apache.flink.runtime.operators.testutils.TestData.Generator; +import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode; +import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode; +import org.apache.flink.runtime.util.ResettableMutableObjectIterator; +import org.apache.flink.types.Record; +import org.apache.flink.types.Value; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.*; +import java.util.Map.Entry; + +public class ReusingSortMergeOuterJoinIteratorITCase { + + // total memory + private static final int MEMORY_SIZE = 1024 * 1024 * 16; + private static final int PAGES_FOR_BNLJN = 2; + + // the size of the left and right inputs + private static final int INPUT_1_SIZE = 2; + + private static final int INPUT_2_SIZE = 1000; + + // random seeds for the left and right input data generators + private static final long SEED1 = 561349061987311L; + + private static final long SEED2 = 231434613412342L; + + // dummy abstract task + private final AbstractInvokable parentTask = new DummyInvokable(); + + private IOManager ioManager; + private MemoryManager memoryManager; + + private TupleTypeInfo> typeInfo1; + private TupleTypeInfo> typeInfo2; + private TupleSerializer> serializer1; + private TupleSerializer> serializer2; + private TypeComparator> comparator1; + private TypeComparator> comparator2; + private TypePairComparator, Tuple2> pairComp; + + + @Before + public void beforeTest() { + ExecutionConfig config = new ExecutionConfig(); + config.disableObjectReuse(); + + typeInfo1 = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); + typeInfo2 = TupleTy
[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm
[ https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639677#comment-14639677 ] ASF GitHub Bot commented on FLINK-2105: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35384117 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java --- @@ -0,0 +1,587 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.record.RecordComparator; +import org.apache.flink.api.common.typeutils.record.RecordPairComparator; +import org.apache.flink.api.common.typeutils.record.RecordSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType; +import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.operators.testutils.TestData; +import org.apache.flink.runtime.operators.testutils.TestData.Generator; +import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode; +import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode; +import org.apache.flink.runtime.util.ResettableMutableObjectIterator; +import org.apache.flink.types.Record; +import org.apache.flink.types.Value; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.*; +import java.util.Map.Entry; + +public class ReusingSortMergeOuterJoinIteratorITCase { + + // total memory + private static final int MEMORY_SIZE = 1024 * 1024 * 16; + private static final int PAGES_FOR_BNLJN = 2; + + // the size of the left and right inputs + private static final int INPUT_1_SIZE = 2; + + private static final int INPUT_2_SIZE = 1000; + + // random seeds for the left and right input data generators + private static final long SEED1 = 561349061987311L; + + private static final long SEED2 = 231434613412342L; + + // dummy abstract task + private final AbstractInvokable parentTask = new DummyInvokable(); + + private IOManager ioManager; + private MemoryManager memoryManager; + + private TupleTypeInfo> typeInfo1; + private TupleTypeInfo> typeInfo2; + private TupleSerializer> serializer1; + private TupleSerializer> serializer2; + private TypeComparator> comparator1; + private TypeComparator> comparator2; + private TypePairComparator, Tuple2> pairComp; + + + @Before + public void bef
[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm
[ https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639640#comment-14639640 ] ASF GitHub Bot commented on FLINK-2105: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35381702 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java --- @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.record.RecordComparator; +import org.apache.flink.api.common.typeutils.record.RecordPairComparator; +import org.apache.flink.api.common.typeutils.record.RecordSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType; +import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.operators.testutils.TestData; +import org.apache.flink.runtime.operators.testutils.TestData.Generator; +import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode; +import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode; +import org.apache.flink.runtime.util.ResettableMutableObjectIterator; +import org.apache.flink.types.Record; +import org.apache.flink.types.Value; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class NonReusingSortMergeOuterJoinIteratorITCase { + + // total memory + private static final int MEMORY_SIZE = 1024 * 1024 * 16; + private static final int PAGES_FOR_BNLJN = 2; + + // the size of the left and right inputs + private static final int INPUT_1_SIZE = 2; + + private static final int INPUT_2_SIZE = 1000; + + // random seeds for the left and right input data generators + private static final long SEED1 = 561349061987311L; + + private static final long SEED2 = 231434613412342L; + + // dummy abstract task + private final AbstractInvokable parentTask = new DummyInvokable(); + + private IOManager ioManager; + private MemoryManager memoryManager; + + private TupleTypeInfo> typeInfo1; + private TupleTypeInfo> typeInfo2; + private TupleSerializer> serializer1; + pr
[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35381702 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java --- @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.record.RecordComparator; +import org.apache.flink.api.common.typeutils.record.RecordPairComparator; +import org.apache.flink.api.common.typeutils.record.RecordSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType; +import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.operators.testutils.TestData; +import org.apache.flink.runtime.operators.testutils.TestData.Generator; +import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode; +import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode; +import org.apache.flink.runtime.util.ResettableMutableObjectIterator; +import org.apache.flink.types.Record; +import org.apache.flink.types.Value; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class NonReusingSortMergeOuterJoinIteratorITCase { + + // total memory + private static final int MEMORY_SIZE = 1024 * 1024 * 16; + private static final int PAGES_FOR_BNLJN = 2; + + // the size of the left and right inputs + private static final int INPUT_1_SIZE = 2; + + private static final int INPUT_2_SIZE = 1000; + + // random seeds for the left and right input data generators + private static final long SEED1 = 561349061987311L; + + private static final long SEED2 = 231434613412342L; + + // dummy abstract task + private final AbstractInvokable parentTask = new DummyInvokable(); + + private IOManager ioManager; + private MemoryManager memoryManager; + + private TupleTypeInfo> typeInfo1; + private TupleTypeInfo> typeInfo2; + private TupleSerializer> serializer1; + private TupleSerializer> serializer2; + private TypeComparator> comparator1; + private TypeComparator> comparator2; + private TypePairComparator, Tuple2> pairComp; + + + @Before + public void beforeTest() { +
[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm
[ https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639621#comment-14639621 ] ASF GitHub Bot commented on FLINK-2105: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35379539 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; + private Boolean it2Empty; + + private OuterJoinType outerJoinType; + + + public AbstractMergeOuterJoinIterator( + OuterJoinType outerJoinType, + MutableObjectIterator input1, + MutableObjectIterator input2, + TypeSerializer serializer1, TypeComparator comparator1, + TypeSerializer serializer2, TypeComparator comparator2, + TypePairComparator pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + + this.outerJoinType = outerJoinType; + } + + /** +* Calls the JoinFunction#match() method for all two key-value pairs that share the same key and come +* from different inputs. Furthermore, depending on the outer join type (LEFT, RIGHT, FULL), all key-value pairs where no +* matching partner from the other input exists are joined with null. +* The output of the match() method is forwarded. +* +* @throws Exception Forwards all exceptions from the user code and the I/O system. +* @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector) +*/ + @Override + public boolean callWithNextKey(final FlatJoinFunction matchFunction, final Collector collector) throws Exception { + if(it1Empty == null && it2Empty == null) { + //first run, set iterators to first elements + it1Empty = !this.iterator1.nextKey(); + it2Empty = !this.iterator2.nextKey(); + } + + if (it1Empty && it2Empty) { + return false; + } + if (it2Empty) { + if(outerJoinType == OuterJoinType.LEFT || outerJoinType == OuterJoinType.FULL) { + joinLeftKeyValuesWithNull(ite
[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm
[ https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639619#comment-14639619 ] ASF GitHub Bot commented on FLINK-2105: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35379501 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; + private Boolean it2Empty; + + private OuterJoinType outerJoinType; + + + public AbstractMergeOuterJoinIterator( + OuterJoinType outerJoinType, + MutableObjectIterator input1, + MutableObjectIterator input2, + TypeSerializer serializer1, TypeComparator comparator1, + TypeSerializer serializer2, TypeComparator comparator2, + TypePairComparator pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + + this.outerJoinType = outerJoinType; + } + + /** +* Calls the JoinFunction#match() method for all two key-value pairs that share the same key and come +* from different inputs. Furthermore, depending on the outer join type (LEFT, RIGHT, FULL), all key-value pairs where no +* matching partner from the other input exists are joined with null. +* The output of the match() method is forwarded. +* +* @throws Exception Forwards all exceptions from the user code and the I/O system. +* @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector) +*/ + @Override + public boolean callWithNextKey(final FlatJoinFunction matchFunction, final Collector collector) throws Exception { + if(it1Empty == null && it2Empty == null) { + //first run, set iterators to first elements + it1Empty = !this.iterator1.nextKey(); + it2Empty = !this.iterator2.nextKey(); + } + + if (it1Empty && it2Empty) { + return false; + } + if (it2Empty) { --- End diff -- Please use `else if` to make it more obvious that these are conditions are mutually exclusive. > Implement Sort-Merge Outer Join algorit
[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35379539 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; + private Boolean it2Empty; + + private OuterJoinType outerJoinType; + + + public AbstractMergeOuterJoinIterator( + OuterJoinType outerJoinType, + MutableObjectIterator input1, + MutableObjectIterator input2, + TypeSerializer serializer1, TypeComparator comparator1, + TypeSerializer serializer2, TypeComparator comparator2, + TypePairComparator pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + + this.outerJoinType = outerJoinType; + } + + /** +* Calls the JoinFunction#match() method for all two key-value pairs that share the same key and come +* from different inputs. Furthermore, depending on the outer join type (LEFT, RIGHT, FULL), all key-value pairs where no +* matching partner from the other input exists are joined with null. +* The output of the match() method is forwarded. +* +* @throws Exception Forwards all exceptions from the user code and the I/O system. +* @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector) +*/ + @Override + public boolean callWithNextKey(final FlatJoinFunction matchFunction, final Collector collector) throws Exception { + if(it1Empty == null && it2Empty == null) { + //first run, set iterators to first elements + it1Empty = !this.iterator1.nextKey(); + it2Empty = !this.iterator2.nextKey(); + } + + if (it1Empty && it2Empty) { + return false; + } + if (it2Empty) { + if(outerJoinType == OuterJoinType.LEFT || outerJoinType == OuterJoinType.FULL) { + joinLeftKeyValuesWithNull(iterator1.getValues(), matchFunction, collector); + it1Empty = !iterator1.nextKey(); + return true; + }else{ + //consume rest of left side +
[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35379501 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; + private Boolean it2Empty; + + private OuterJoinType outerJoinType; + + + public AbstractMergeOuterJoinIterator( + OuterJoinType outerJoinType, + MutableObjectIterator input1, + MutableObjectIterator input2, + TypeSerializer serializer1, TypeComparator comparator1, + TypeSerializer serializer2, TypeComparator comparator2, + TypePairComparator pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + + this.outerJoinType = outerJoinType; + } + + /** +* Calls the JoinFunction#match() method for all two key-value pairs that share the same key and come +* from different inputs. Furthermore, depending on the outer join type (LEFT, RIGHT, FULL), all key-value pairs where no +* matching partner from the other input exists are joined with null. +* The output of the match() method is forwarded. +* +* @throws Exception Forwards all exceptions from the user code and the I/O system. +* @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector) +*/ + @Override + public boolean callWithNextKey(final FlatJoinFunction matchFunction, final Collector collector) throws Exception { + if(it1Empty == null && it2Empty == null) { + //first run, set iterators to first elements + it1Empty = !this.iterator1.nextKey(); + it2Empty = !this.iterator2.nextKey(); + } + + if (it1Empty && it2Empty) { + return false; + } + if (it2Empty) { --- End diff -- Please use `else if` to make it more obvious that these are conditions are mutually exclusive. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA
[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35379150 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; + private Boolean it2Empty; + + private OuterJoinType outerJoinType; --- End diff -- Please make `outerJoinType` `final`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm
[ https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639617#comment-14639617 ] ASF GitHub Bot commented on FLINK-2105: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35379150 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; + private Boolean it2Empty; + + private OuterJoinType outerJoinType; --- End diff -- Please make `outerJoinType` `final`. > Implement Sort-Merge Outer Join algorithm > - > > Key: FLINK-2105 > URL: https://issues.apache.org/jira/browse/FLINK-2105 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Ricky Pogalz >Priority: Minor > Fix For: pre-apache > > > Flink does not natively support outer joins at the moment. > This issue proposes to implement a sort-merge outer join algorithm that can > cover left, right, and full outer joins. > The implementation can be based on the regular sort-merge join iterator > ({{ReusingMergeMatchIterator}} and {{NonReusingMergeMatchIterator}}, see also > {{MatchDriver}} class) > The Reusing and NonReusing variants differ in whether object instances are > reused or new objects are created. I would start with the NonReusing variant > which is safer from a user's point of view and should also be easier to > implement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35378409 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; + private Boolean it2Empty; + + private OuterJoinType outerJoinType; + + + public AbstractMergeOuterJoinIterator( + OuterJoinType outerJoinType, + MutableObjectIterator input1, + MutableObjectIterator input2, + TypeSerializer serializer1, TypeComparator comparator1, + TypeSerializer serializer2, TypeComparator comparator2, + TypePairComparator pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + + this.outerJoinType = outerJoinType; + } + + /** +* Calls the JoinFunction#match() method for all two key-value pairs that share the same key and come +* from different inputs. Furthermore, depending on the outer join type (LEFT, RIGHT, FULL), all key-value pairs where no +* matching partner from the other input exists are joined with null. +* The output of the match() method is forwarded. +* +* @throws Exception Forwards all exceptions from the user code and the I/O system. +* @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector) +*/ + @Override + public boolean callWithNextKey(final FlatJoinFunction matchFunction, final Collector collector) throws Exception { + if(it1Empty == null && it2Empty == null) { --- End diff -- You can check the first call with another `boolean` flag. Checking against a non-initialized variable is not so nice, IMO. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm
[ https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639605#comment-14639605 ] ASF GitHub Bot commented on FLINK-2105: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35378409 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; + private Boolean it2Empty; + + private OuterJoinType outerJoinType; + + + public AbstractMergeOuterJoinIterator( + OuterJoinType outerJoinType, + MutableObjectIterator input1, + MutableObjectIterator input2, + TypeSerializer serializer1, TypeComparator comparator1, + TypeSerializer serializer2, TypeComparator comparator2, + TypePairComparator pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + + this.outerJoinType = outerJoinType; + } + + /** +* Calls the JoinFunction#match() method for all two key-value pairs that share the same key and come +* from different inputs. Furthermore, depending on the outer join type (LEFT, RIGHT, FULL), all key-value pairs where no +* matching partner from the other input exists are joined with null. +* The output of the match() method is forwarded. +* +* @throws Exception Forwards all exceptions from the user code and the I/O system. +* @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector) +*/ + @Override + public boolean callWithNextKey(final FlatJoinFunction matchFunction, final Collector collector) throws Exception { + if(it1Empty == null && it2Empty == null) { --- End diff -- You can check the first call with another `boolean` flag. Checking against a non-initialized variable is not so nice, IMO. > Implement Sort-Merge Outer Join algorithm > - > > Key: FLINK-2105 > URL: https://issues.apache.org/jira/browse/FLINK-2105 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Ricky Po
[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35378056 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; + private Boolean it2Empty; + + private OuterJoinType outerJoinType; + + + public AbstractMergeOuterJoinIterator( + OuterJoinType outerJoinType, + MutableObjectIterator input1, + MutableObjectIterator input2, + TypeSerializer serializer1, TypeComparator comparator1, + TypeSerializer serializer2, TypeComparator comparator2, + TypePairComparator pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + + this.outerJoinType = outerJoinType; + } + + /** +* Calls the JoinFunction#match() method for all two key-value pairs that share the same key and come +* from different inputs. Furthermore, depending on the outer join type (LEFT, RIGHT, FULL), all key-value pairs where no +* matching partner from the other input exists are joined with null. +* The output of the match() method is forwarded. +* +* @throws Exception Forwards all exceptions from the user code and the I/O system. +* @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector) +*/ + @Override + public boolean callWithNextKey(final FlatJoinFunction matchFunction, final Collector collector) throws Exception { + if(it1Empty == null && it2Empty == null) { + //first run, set iterators to first elements + it1Empty = !this.iterator1.nextKey(); + it2Empty = !this.iterator2.nextKey(); + } + + if (it1Empty && it2Empty) { + return false; + } + if (it2Empty) { + if(outerJoinType == OuterJoinType.LEFT || outerJoinType == OuterJoinType.FULL) { + joinLeftKeyValuesWithNull(iterator1.getValues(), matchFunction, collector); + it1Empty = !iterator1.nextKey(); + return true; + }else{ + //consume rest of left side +
[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm
[ https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639596#comment-14639596 ] ASF GitHub Bot commented on FLINK-2105: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35377999 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; + private Boolean it2Empty; + + private OuterJoinType outerJoinType; + + + public AbstractMergeOuterJoinIterator( + OuterJoinType outerJoinType, + MutableObjectIterator input1, + MutableObjectIterator input2, + TypeSerializer serializer1, TypeComparator comparator1, + TypeSerializer serializer2, TypeComparator comparator2, + TypePairComparator pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + + this.outerJoinType = outerJoinType; --- End diff -- please initialize {{it1Empty}} and {{it2Empty}}. > Implement Sort-Merge Outer Join algorithm > - > > Key: FLINK-2105 > URL: https://issues.apache.org/jira/browse/FLINK-2105 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Ricky Pogalz >Priority: Minor > Fix For: pre-apache > > > Flink does not natively support outer joins at the moment. > This issue proposes to implement a sort-merge outer join algorithm that can > cover left, right, and full outer joins. > The implementation can be based on the regular sort-merge join iterator > ({{ReusingMergeMatchIterator}} and {{NonReusingMergeMatchIterator}}, see also > {{MatchDriver}} class) > The Reusing and NonReusing variants differ in whether object instances are > reused or new objects are created. I would start with the NonReusing variant > which is safer from a user's point of view and should also be easier to > implement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm
[ https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639598#comment-14639598 ] ASF GitHub Bot commented on FLINK-2105: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35378056 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; + private Boolean it2Empty; + + private OuterJoinType outerJoinType; + + + public AbstractMergeOuterJoinIterator( + OuterJoinType outerJoinType, + MutableObjectIterator input1, + MutableObjectIterator input2, + TypeSerializer serializer1, TypeComparator comparator1, + TypeSerializer serializer2, TypeComparator comparator2, + TypePairComparator pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + + this.outerJoinType = outerJoinType; + } + + /** +* Calls the JoinFunction#match() method for all two key-value pairs that share the same key and come +* from different inputs. Furthermore, depending on the outer join type (LEFT, RIGHT, FULL), all key-value pairs where no +* matching partner from the other input exists are joined with null. +* The output of the match() method is forwarded. +* +* @throws Exception Forwards all exceptions from the user code and the I/O system. +* @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector) +*/ + @Override + public boolean callWithNextKey(final FlatJoinFunction matchFunction, final Collector collector) throws Exception { + if(it1Empty == null && it2Empty == null) { + //first run, set iterators to first elements + it1Empty = !this.iterator1.nextKey(); + it2Empty = !this.iterator2.nextKey(); + } + + if (it1Empty && it2Empty) { + return false; + } + if (it2Empty) { + if(outerJoinType == OuterJoinType.LEFT || outerJoinType == OuterJoinType.FULL) { + joinLeftKeyValuesWithNull(ite
[jira] [Commented] (FLINK-2105) Implement Sort-Merge Outer Join algorithm
[ https://issues.apache.org/jira/browse/FLINK-2105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639595#comment-14639595 ] ASF GitHub Bot commented on FLINK-2105: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35377964 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; --- End diff -- why aren't you using the {{boolean}} primitive type? > Implement Sort-Merge Outer Join algorithm > - > > Key: FLINK-2105 > URL: https://issues.apache.org/jira/browse/FLINK-2105 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Ricky Pogalz >Priority: Minor > Fix For: pre-apache > > > Flink does not natively support outer joins at the moment. > This issue proposes to implement a sort-merge outer join algorithm that can > cover left, right, and full outer joins. > The implementation can be based on the regular sort-merge join iterator > ({{ReusingMergeMatchIterator}} and {{NonReusingMergeMatchIterator}}, see also > {{MatchDriver}} class) > The Reusing and NonReusing variants differ in whether object instances are > reused or new objects are created. I would start with the NonReusing variant > which is safer from a user's point of view and should also be easier to > implement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35377999 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; + private Boolean it2Empty; + + private OuterJoinType outerJoinType; + + + public AbstractMergeOuterJoinIterator( + OuterJoinType outerJoinType, + MutableObjectIterator input1, + MutableObjectIterator input2, + TypeSerializer serializer1, TypeComparator comparator1, + TypeSerializer serializer2, TypeComparator comparator2, + TypePairComparator pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + + this.outerJoinType = outerJoinType; --- End diff -- please initialize {{it1Empty}} and {{it2Empty}}. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2105] Implement Sort-Merge Outer Join a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/907#discussion_r35377964 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + * */ +public abstract class AbstractMergeOuterJoinIterator extends AbstractMergeIterator { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private Boolean it1Empty; --- End diff -- why aren't you using the {{boolean}} primitive type? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2394) HadoopOutFormat OutputCommitter is default to FileOutputCommiter
[ https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639545#comment-14639545 ] Fabian Hueske commented on FLINK-2394: -- Hadoop handles {{OutputCommitters}} differently for its both APIs ({{mapred}} and {{mapreduce}}). - For the newer {{mapreduce}} API, things are quite simple. {{org.apache.hadoop.mapreduce.OutputFormat}} has an abstract method {{getOutputCommitter(TaskAttemptContext context)}} which provides the output committer that must be used. So updating the Flink {{mapreduce.HadoopOutputFormatBase}}Â class such that it gets the {{OutputCommitter}} directly from the {{OutputFormat}} instead of creating a {{FileOutputCommitter}} should be easy. - For the older {{mapred}} API, the OutputFormat interface does not define a {{getOutputCommitter()}} method. Instead the {{JobConf}} has a {{getOutputCommitter()}} method which returns a {{FileOutputCommitter}} if nothing else is defined. So we could update the {{mapred.HadoopOutputFormatBase}} to get the {{OutputCommitter}} from the {{JobConf}}. However, this would require that users manually set a different {{OutputCommitter}} in the {{JobConf}} which is not really intuitive. We could offer an additional constructor which sets a {{OutputCommitter}}Â in the provided {{JobConf}} argument, like [~stefano.bortoli] suggested. > HadoopOutFormat OutputCommitter is default to FileOutputCommiter > > > Key: FLINK-2394 > URL: https://issues.apache.org/jira/browse/FLINK-2394 > Project: Flink > Issue Type: Bug > Components: Hadoop Compatibility >Affects Versions: 0.9.0 >Reporter: Stefano Bortoli > > MongoOutputFormat does not write back in collection because the > HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and > is set as default to FileOutputCommitter. Therefore, on close and > globalFinalize execution the commit does not happen and mongo collection > stays untouched. > A simple solution would be to: > 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat > that gets the OutputCommitter as a parameter > 2 - change the outputCommitter field of HadoopOutputFormatBase to be a > generic OutputCommitter > 3 - remove the default assignment in the open() and finalizeGlobal to the > outputCommitter to FileOutputCommitter(), or keep it as a default in case of > no specific assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2403) Rework partitioned state checkpointing logic
Gyula Fora created FLINK-2403: - Summary: Rework partitioned state checkpointing logic Key: FLINK-2403 URL: https://issues.apache.org/jira/browse/FLINK-2403 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Gyula Fora In case of partitioned states, states are stored per-key (creating a statehandle for each key) so that we can later use this for repartitioning without fetching all the states. While this mechanism might be desired later it is extremely inefficient if we use a state backend such as HDFS as we are creating many small files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (FLINK-2152) Provide zipWithIndex utility in flink-contrib
[ https://issues.apache.org/jira/browse/FLINK-2152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johannes Günther reopened FLINK-2152: - When Using zip with index in a longer running job the following error occurs: ava.lang.Exception: The user defined 'open()' method caused an exception: null at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:366) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.ConcurrentModificationException at java.util.ArrayList.sort(ArrayList.java:1456) at java.util.Collections.sort(Collections.java:175) at org.apache.flink.api.java.utils.DataSetUtils$2.open(DataSetUtils.java:80) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:33) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:492) ... 3 more org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:93) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: null at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:366) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.ConcurrentModificationException at java.util.ArrayList.sort(ArrayList.java:1456) at java.util.Collections.sort(Collections.java:175) at org.apache.flink.api.java.utils.DataSetUtils$2.open(DataSetUtils.java:80) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:33) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:492) ... 3 more This can be fixed by wrapping a concurrent list around the counts variable e.g. CopyOnWriteArrayList from java.util.concurrrent (in the open method) > Provide zipWithIndex utility in flink-contrib > - > > Key: FLINK-2152 > URL: https://issues.apache.org/jira/browse/FLINK-2152 > Project: Flink > Issue Type: Improvement > Components: Java API >Reporter: Robert Metzger >Assignee: Andra Lungu >Priority: Trivial > Labels: starter > Fix For: 0.10 > > > We should provide a simple utility method for zipping elements in a data set > with a dense index. > its up for discussion whether we want it directly in the API or if we should > provide it only as a utility from {{flink-contrib}}. > I would put it in {{flink-contrib}}. > See my answer on SO: > http://stackoverflow.com/questions/30596556/zipwithindex-on-apache-flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2391) Storm-compatibility:method FlinkTopologyBuilder.createTopology() throws java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-2391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639389#comment-14639389 ] ASF GitHub Bot commented on FLINK-2391: --- Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/932#issuecomment-124218025 Your second commit reverted your first commit. I don't see any changes. "File changed 0". > Storm-compatibility:method FlinkTopologyBuilder.createTopology() throws > java.lang.NullPointerException > -- > > Key: FLINK-2391 > URL: https://issues.apache.org/jira/browse/FLINK-2391 > Project: Flink > Issue Type: Bug > Components: flink-contrib >Affects Versions: 0.10 > Environment: win7 32bit;linux >Reporter: Huang Wei > Labels: features > Fix For: 0.10 > > Original Estimate: 168h > Remaining Estimate: 168h > > core dumped at FlinkOutputFieldsDeclarer.java : 160(package > FlinkOutputFieldsDeclarer). > code : fieldIndexes[i] = this.outputSchema.fieldIndex(groupingFields.get(i)); > in this line, the var this.outputSchema may be null. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2391]Fix Storm-compatibility FlinkTopol...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/932#issuecomment-124218025 Your second commit reverted your first commit. I don't see any changes. "File changed 0". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2398) Decouple StreamGraph Building from the API
[ https://issues.apache.org/jira/browse/FLINK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639107#comment-14639107 ] Aljoscha Krettek commented on FLINK-2398: - But what would go into {{streaming-core}} and {{streaming-runtime}}? {{streaming-core}} is, right now, a (more or less) small layer on top of {{flink-core}} and {{flink-runtime}}. The separation is not very clear what would go where, IMHO. > Decouple StreamGraph Building from the API > -- > > Key: FLINK-2398 > URL: https://issues.apache.org/jira/browse/FLINK-2398 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Currently, the building of the StreamGraph is very intertwined with the API > methods. DataStream knows about the StreamGraph and keeps track of splitting, > selected names, unions and so on. This leads to the problem that is is very > hard to understand how the StreamGraph is built because the code that does it > is all over the place. This also makes it hard to extend/change parts of the > Streaming system. > I propose to introduce "Transformations". A transformation hold information > about one operation: The input streams, types, names, operator and so on. An > API method creates a transformation instead of fiddling with the StreamGraph > directly. A new component, the StreamGraphGenerator creates a StreamGraph > from the tree of transformations that result from program specification using > the API methods. This would relieve DataStream from knowing about the > StreamGraph and makes unions, splitting, selection visible transformations > instead of being scattered across the different API classes as fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2402) Add a non-blocking BarrierTracker
Stephan Ewen created FLINK-2402: --- Summary: Add a non-blocking BarrierTracker Key: FLINK-2402 URL: https://issues.apache.org/jira/browse/FLINK-2402 Project: Flink Issue Type: Sub-task Components: Streaming Affects Versions: 0.10 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 0.10 This issue would add a new tracker for barriers that simply tracks what barriers have been observed from which inputs. It never blocks off buffers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2398) Decouple StreamGraph Building from the API
[ https://issues.apache.org/jira/browse/FLINK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639094#comment-14639094 ] Stephan Ewen commented on FLINK-2398: - What would be great down the line is the following: - Have the {{streaming-core} and {streaming-example}} independent of the runtime, much like for the batch API. - Add the runtime related streaming classes to {{flink-runtime}}. > Decouple StreamGraph Building from the API > -- > > Key: FLINK-2398 > URL: https://issues.apache.org/jira/browse/FLINK-2398 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Currently, the building of the StreamGraph is very intertwined with the API > methods. DataStream knows about the StreamGraph and keeps track of splitting, > selected names, unions and so on. This leads to the problem that is is very > hard to understand how the StreamGraph is built because the code that does it > is all over the place. This also makes it hard to extend/change parts of the > Streaming system. > I propose to introduce "Transformations". A transformation hold information > about one operation: The input streams, types, names, operator and so on. An > API method creates a transformation instead of fiddling with the StreamGraph > directly. A new component, the StreamGraphGenerator creates a StreamGraph > from the tree of transformations that result from program specification using > the API methods. This would relieve DataStream from knowing about the > StreamGraph and makes unions, splitting, selection visible transformations > instead of being scattered across the different API classes as fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2385) Scala DataSet.distinct should have parenthesis
[ https://issues.apache.org/jira/browse/FLINK-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-2385. - Resolution: Fixed Fixed via 5d475f88784f796e054d7cff2492b9989d3f47d6 > Scala DataSet.distinct should have parenthesis > -- > > Key: FLINK-2385 > URL: https://issues.apache.org/jira/browse/FLINK-2385 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.10 > > > The method is not a side-effect free accessor, but defines heavy computation, > even if it does not mutate the original data set. > This is a somewhat API breaking change. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2385) Scala DataSet.distinct should have parenthesis
[ https://issues.apache.org/jira/browse/FLINK-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-2385. --- > Scala DataSet.distinct should have parenthesis > -- > > Key: FLINK-2385 > URL: https://issues.apache.org/jira/browse/FLINK-2385 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.10 > > > The method is not a side-effect free accessor, but defines heavy computation, > even if it does not mutate the original data set. > This is a somewhat API breaking change. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2385) Scala DataSet.distinct should have parenthesis
[ https://issues.apache.org/jira/browse/FLINK-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639041#comment-14639041 ] ASF GitHub Bot commented on FLINK-2385: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/933 > Scala DataSet.distinct should have parenthesis > -- > > Key: FLINK-2385 > URL: https://issues.apache.org/jira/browse/FLINK-2385 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.10 > > > The method is not a side-effect free accessor, but defines heavy computation, > even if it does not mutate the original data set. > This is a somewhat API breaking change. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2385] [scala api] [api breaking] Add pa...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/933 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2401) Replace ActorRefs with ActorGateway in web server
Till Rohrmann created FLINK-2401: Summary: Replace ActorRefs with ActorGateway in web server Key: FLINK-2401 URL: https://issues.apache.org/jira/browse/FLINK-2401 Project: Flink Issue Type: Improvement Reporter: Till Rohrmann Priority: Minor The web server is the only remaining component which uses {{ActorRefs}} directly to communicate with Flink actors. They should be replaced by {{ActorGateways}} which allow the automatic decoration of messages. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2332) Assign session IDs to JobManager and TaskManager messages
[ https://issues.apache.org/jira/browse/FLINK-2332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-2332. Resolution: Fixed Added via 45428518d0e1b843947a6184b4a803a78ad5 > Assign session IDs to JobManager and TaskManager messages > - > > Key: FLINK-2332 > URL: https://issues.apache.org/jira/browse/FLINK-2332 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 0.10 > > > In order to support true high availability {{TaskManager}} and {{JobManager}} > have to be able to distinguish whether a message was sent from the leader or > whether a message was sent from a former leader. Messages which come from a > former leader have to be discarded in order to guarantee a consistent state. > A way to do achieve this is to assign a leader session ID to a {{JobManager}} > once he's elected as leader. This leader session ID is sent to the > {{TaskManager}} upon registration at the {{JobManager}}. All subsequent > messages should then be decorated with this leader session ID to mark them as > valid. On the {{TaskManager}} side the received leader session ID as a > response to the registration message, can then be used to validate incoming > messages. > The same holds true for registration messages which should have a > registration session ID, too. That way, it is possible to distinguish invalid > registration messages from valid ones. The registration session ID can be > assigned once the TaskManager is notified about the new leader. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/917 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2332) Assign session IDs to JobManager and TaskManager messages
[ https://issues.apache.org/jira/browse/FLINK-2332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639028#comment-14639028 ] ASF GitHub Bot commented on FLINK-2332: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/917 > Assign session IDs to JobManager and TaskManager messages > - > > Key: FLINK-2332 > URL: https://issues.apache.org/jira/browse/FLINK-2332 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 0.10 > > > In order to support true high availability {{TaskManager}} and {{JobManager}} > have to be able to distinguish whether a message was sent from the leader or > whether a message was sent from a former leader. Messages which come from a > former leader have to be discarded in order to guarantee a consistent state. > A way to do achieve this is to assign a leader session ID to a {{JobManager}} > once he's elected as leader. This leader session ID is sent to the > {{TaskManager}} upon registration at the {{JobManager}}. All subsequent > messages should then be decorated with this leader session ID to mark them as > valid. On the {{TaskManager}} side the received leader session ID as a > response to the registration message, can then be used to validate incoming > messages. > The same holds true for registration messages which should have a > registration session ID, too. That way, it is possible to distinguish invalid > registration messages from valid ones. The registration session ID can be > assigned once the TaskManager is notified about the new leader. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/917#issuecomment-124143378 Tests pass on Travis. Will merge it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2332) Assign session IDs to JobManager and TaskManager messages
[ https://issues.apache.org/jira/browse/FLINK-2332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639018#comment-14639018 ] ASF GitHub Bot commented on FLINK-2332: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/917#issuecomment-124143378 Tests pass on Travis. Will merge it. > Assign session IDs to JobManager and TaskManager messages > - > > Key: FLINK-2332 > URL: https://issues.apache.org/jira/browse/FLINK-2332 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 0.10 > > > In order to support true high availability {{TaskManager}} and {{JobManager}} > have to be able to distinguish whether a message was sent from the leader or > whether a message was sent from a former leader. Messages which come from a > former leader have to be discarded in order to guarantee a consistent state. > A way to do achieve this is to assign a leader session ID to a {{JobManager}} > once he's elected as leader. This leader session ID is sent to the > {{TaskManager}} upon registration at the {{JobManager}}. All subsequent > messages should then be decorated with this leader session ID to mark them as > valid. On the {{TaskManager}} side the received leader session ID as a > response to the registration message, can then be used to validate incoming > messages. > The same holds true for registration messages which should have a > registration session ID, too. That way, it is possible to distinguish invalid > registration messages from valid ones. The registration session ID can be > assigned once the TaskManager is notified about the new leader. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638985#comment-14638985 ] Till Rohrmann commented on FLINK-2392: -- The same happened to me: https://s3.amazonaws.com/archive.travis-ci.org/jobs/72303347/log.txt > Instable test in flink-yarn-tests > - > > Key: FLINK-2392 > URL: https://issues.apache.org/jira/browse/FLINK-2392 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Matthias J. Sax >Priority: Minor > > The test YARNSessionFIFOITCase fails from time to time on an irregular basis. > For example see: https://travis-ci.org/apache/flink/jobs/72019690 > Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) > Time elapsed: 60.651 sec <<< FAILURE! > java.lang.AssertionError: During the timeout period of 60 seconds the > expected string did not show up > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435) > Results : > Failed tests: > > YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478 > During the timeout period of 60 seconds the expected string did not show up -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2398) Decouple StreamGraph Building from the API
[ https://issues.apache.org/jira/browse/FLINK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638935#comment-14638935 ] Aljoscha Krettek commented on FLINK-2398: - Yes, I think this is a prerequisite for several things we're working on. I've started fiddling around with this and I have it working except for split/select and iterations. I think the first step is to remove the dependency on StreamGraph from DataStream and the other API classes. Once this is done we can change the representation to make it work for both stream and batch. In the code I have right now, the transform() method of DataStream simply creates a OneInputTransformation, transform() on ConnectedDataStream creates a TwoInputTransformation, union creates a UnionTransformation, and so on... > Decouple StreamGraph Building from the API > -- > > Key: FLINK-2398 > URL: https://issues.apache.org/jira/browse/FLINK-2398 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Currently, the building of the StreamGraph is very intertwined with the API > methods. DataStream knows about the StreamGraph and keeps track of splitting, > selected names, unions and so on. This leads to the problem that is is very > hard to understand how the StreamGraph is built because the code that does it > is all over the place. This also makes it hard to extend/change parts of the > Streaming system. > I propose to introduce "Transformations". A transformation hold information > about one operation: The input streams, types, names, operator and so on. An > API method creates a transformation instead of fiddling with the StreamGraph > directly. A new component, the StreamGraphGenerator creates a StreamGraph > from the tree of transformations that result from program specification using > the API methods. This would relieve DataStream from knowing about the > StreamGraph and makes unions, splitting, selection visible transformations > instead of being scattered across the different API classes as fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2400) TaskTest.testExecutionFailesAfterCanceling fails
[ https://issues.apache.org/jira/browse/FLINK-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi resolved FLINK-2400. Resolution: Later Changed the error message via fa78be6. We have to check whether this occurs again or not. > TaskTest.testExecutionFailesAfterCanceling fails > > > Key: FLINK-2400 > URL: https://issues.apache.org/jira/browse/FLINK-2400 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.9, master >Reporter: Ufuk Celebi >Priority: Minor > Fix For: 0.10 > > > The following test failed on Travis: > TaskTest.testExecutionFailesAfterCanceling:561->validateUnregisterTask:774 > TaskManager message is not 'UnregisterTask' > https://travis-ci.org/uce/flink/jobs/72292328 > I think it's some flakeyness in the test. It's the first time I saw this > failing. From the logs, I couldn't tell what went wrong. In the test there is > a timeout for the expected message to arrive. Maybe that's the issue, > [~StephanEwen]? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2400) TaskTest.testExecutionFailesAfterCanceling fails
[ https://issues.apache.org/jira/browse/FLINK-2400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638913#comment-14638913 ] Stephan Ewen commented on FLINK-2400: - Would that error be triggered on timeout? The first step should be to make the error better. Log if it is a timeout, or what message it was, if it is an unexpected message. > TaskTest.testExecutionFailesAfterCanceling fails > > > Key: FLINK-2400 > URL: https://issues.apache.org/jira/browse/FLINK-2400 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.9, master >Reporter: Ufuk Celebi >Priority: Minor > Fix For: 0.10 > > > The following test failed on Travis: > TaskTest.testExecutionFailesAfterCanceling:561->validateUnregisterTask:774 > TaskManager message is not 'UnregisterTask' > https://travis-ci.org/uce/flink/jobs/72292328 > I think it's some flakeyness in the test. It's the first time I saw this > failing. From the logs, I couldn't tell what went wrong. In the test there is > a timeout for the expected message to arrive. Maybe that's the issue, > [~StephanEwen]? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2400) TaskTest.testExecutionFailesAfterCanceling fails
Ufuk Celebi created FLINK-2400: -- Summary: TaskTest.testExecutionFailesAfterCanceling fails Key: FLINK-2400 URL: https://issues.apache.org/jira/browse/FLINK-2400 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 0.9, master Reporter: Ufuk Celebi Priority: Minor Fix For: 0.10 The following test failed on Travis: TaskTest.testExecutionFailesAfterCanceling:561->validateUnregisterTask:774 TaskManager message is not 'UnregisterTask' https://travis-ci.org/uce/flink/jobs/72292328 I think it's some flakeyness in the test. It's the first time I saw this failing. From the logs, I couldn't tell what went wrong. In the test there is a timeout for the expected message to arrive. Maybe that's the issue, [~StephanEwen]? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2397) Unify two backend servers to one server
[ https://issues.apache.org/jira/browse/FLINK-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-2397. - Resolution: Fixed Fixed via c52e753a8d3fc15ed48df7bfa24a327a90df9a0f > Unify two backend servers to one server > --- > > Key: FLINK-2397 > URL: https://issues.apache.org/jira/browse/FLINK-2397 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.10 > > > Currently, the new dashboard still needs both the old backend server and the > new backend server. We need to migrate the requests from {{/jobsInfo}} to the > new server. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2397) Unify two backend servers to one server
[ https://issues.apache.org/jira/browse/FLINK-2397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-2397. --- > Unify two backend servers to one server > --- > > Key: FLINK-2397 > URL: https://issues.apache.org/jira/browse/FLINK-2397 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.10 > > > Currently, the new dashboard still needs both the old backend server and the > new backend server. We need to migrate the requests from {{/jobsInfo}} to the > new server. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2398) Decouple StreamGraph Building from the API
[ https://issues.apache.org/jira/browse/FLINK-2398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638891#comment-14638891 ] Gyula Fora commented on FLINK-2398: --- I suggest we design the API in a way that it will make it possible to unify batch and streaming programs in the future. So we should pick a representation that will work for both so we can modify the batch api later. (this will probably mean the rewriting of the StreamGraph into some common representation at some point) > Decouple StreamGraph Building from the API > -- > > Key: FLINK-2398 > URL: https://issues.apache.org/jira/browse/FLINK-2398 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Currently, the building of the StreamGraph is very intertwined with the API > methods. DataStream knows about the StreamGraph and keeps track of splitting, > selected names, unions and so on. This leads to the problem that is is very > hard to understand how the StreamGraph is built because the code that does it > is all over the place. This also makes it hard to extend/change parts of the > Streaming system. > I propose to introduce "Transformations". A transformation hold information > about one operation: The input streams, types, names, operator and so on. An > API method creates a transformation instead of fiddling with the StreamGraph > directly. A new component, the StreamGraphGenerator creates a StreamGraph > from the tree of transformations that result from program specification using > the API methods. This would relieve DataStream from knowing about the > StreamGraph and makes unions, splitting, selection visible transformations > instead of being scattered across the different API classes as fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2399) Fail when actor versions don't match
Ufuk Celebi created FLINK-2399: -- Summary: Fail when actor versions don't match Key: FLINK-2399 URL: https://issues.apache.org/jira/browse/FLINK-2399 Project: Flink Issue Type: Improvement Components: JobManager, TaskManager Affects Versions: 0.9, master Reporter: Ufuk Celebi Priority: Minor Fix For: 0.10 Problem: there can be subtle errors when actors from different Flink versions communicate with each other, for example when an old client (e.g. Flink 0.9) communicates with a new JobManager (e.g. Flink 0.10-SNAPSHOT). We can check that the versions match on first communication between the actors and fail if they don't match. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2205) Confusing entries in JM Webfrontend Job Configuration section
[ https://issues.apache.org/jira/browse/FLINK-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638815#comment-14638815 ] ASF GitHub Bot commented on FLINK-2205: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/927#issuecomment-124109471 I think you can actually add the code to the current UI. It is not going to disappear in the near future, it will probably stay as a fallback variant for one more version. You can add this functionality there. Adding it to the new web frontend is a separate effort. If you want, you can be involved there as well. > Confusing entries in JM Webfrontend Job Configuration section > - > > Key: FLINK-2205 > URL: https://issues.apache.org/jira/browse/FLINK-2205 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 0.9 >Reporter: Fabian Hueske >Priority: Minor > Labels: starter > > The Job Configuration section of the job history / analyze page of the > JobManager webinterface contains two confusing entries: > - {{Number of execution retries}} is actually the maximum number of retries > and should be renamed accordingly. The default value is -1 and should be > changed to "deactivated" (or 0). > - {{Job parallelism}} which is -1 by default. A parallelism of -1 is not very > meaningful. It would be better to show something like "auto" -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2205] Fix confusing entries in JM UI Jo...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/927#issuecomment-124109471 I think you can actually add the code to the current UI. It is not going to disappear in the near future, it will probably stay as a fallback variant for one more version. You can add this functionality there. Adding it to the new web frontend is a separate effort. If you want, you can be involved there as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2398) Decouple StreamGraph Building from the API
Aljoscha Krettek created FLINK-2398: --- Summary: Decouple StreamGraph Building from the API Key: FLINK-2398 URL: https://issues.apache.org/jira/browse/FLINK-2398 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek Currently, the building of the StreamGraph is very intertwined with the API methods. DataStream knows about the StreamGraph and keeps track of splitting, selected names, unions and so on. This leads to the problem that is is very hard to understand how the StreamGraph is built because the code that does it is all over the place. This also makes it hard to extend/change parts of the Streaming system. I propose to introduce "Transformations". A transformation hold information about one operation: The input streams, types, names, operator and so on. An API method creates a transformation instead of fiddling with the StreamGraph directly. A new component, the StreamGraphGenerator creates a StreamGraph from the tree of transformations that result from program specification using the API methods. This would relieve DataStream from knowing about the StreamGraph and makes unions, splitting, selection visible transformations instead of being scattered across the different API classes as fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2385) Scala DataSet.distinct should have parenthesis
[ https://issues.apache.org/jira/browse/FLINK-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638766#comment-14638766 ] ASF GitHub Bot commented on FLINK-2385: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/933#issuecomment-124088349 Nope. +1 from my side. > Scala DataSet.distinct should have parenthesis > -- > > Key: FLINK-2385 > URL: https://issues.apache.org/jira/browse/FLINK-2385 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.10 > > > The method is not a side-effect free accessor, but defines heavy computation, > even if it does not mutate the original data set. > This is a somewhat API breaking change. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2385] [scala api] [api breaking] Add pa...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/933#issuecomment-124088349 Nope. +1 from my side. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2385) Scala DataSet.distinct should have parenthesis
[ https://issues.apache.org/jira/browse/FLINK-2385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638757#comment-14638757 ] ASF GitHub Bot commented on FLINK-2385: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/933#issuecomment-124086584 Any reservation against this? (The one failed build profile is due to a Travis hiccup) > Scala DataSet.distinct should have parenthesis > -- > > Key: FLINK-2385 > URL: https://issues.apache.org/jira/browse/FLINK-2385 > Project: Flink > Issue Type: Bug > Components: Scala API >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.10 > > > The method is not a side-effect free accessor, but defines heavy computation, > even if it does not mutate the original data set. > This is a somewhat API breaking change. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2385] [scala api] [api breaking] Add pa...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/933#issuecomment-124086584 Any reservation against this? (The one failed build profile is due to a Travis hiccup) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2397) Unify two backend servers to one server
Stephan Ewen created FLINK-2397: --- Summary: Unify two backend servers to one server Key: FLINK-2397 URL: https://issues.apache.org/jira/browse/FLINK-2397 Project: Flink Issue Type: Sub-task Components: Webfrontend Affects Versions: 0.10 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 0.10 Currently, the new dashboard still needs both the old backend server and the new backend server. We need to migrate the requests from {{/jobsInfo}} to the new server. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2389) Add dashboard frontend architecture amd build infrastructue
[ https://issues.apache.org/jira/browse/FLINK-2389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-2389. --- > Add dashboard frontend architecture amd build infrastructue > --- > > Key: FLINK-2389 > URL: https://issues.apache.org/jira/browse/FLINK-2389 > Project: Flink > Issue Type: Sub-task > Components: Webfrontend >Affects Versions: 0.10 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 0.10 > > > Add the build infrastructure and basic libraries for a modern and modular web > dashboard. > The dashboard is going to be built using angular.js. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638617#comment-14638617 ] Chengxiang Li commented on FLINK-1901: -- Thanks for the analysis, [~trohrm...@apache.org], I've created FLINK-2396 as a followup work to support sample and other similar datasets in iteration. > Create sample operator for Dataset > -- > > Key: FLINK-1901 > URL: https://issues.apache.org/jira/browse/FLINK-1901 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Theodore Vasiloudis >Assignee: Chengxiang Li > > In order to be able to implement Stochastic Gradient Descent and a number of > other machine learning algorithms we need to have a way to take a random > sample from a Dataset. > We need to be able to sample with or without replacement from the Dataset, > choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2341) Deadlock in SpilledSubpartitionViewAsyncIO
[ https://issues.apache.org/jira/browse/FLINK-2341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638613#comment-14638613 ] Ufuk Celebi commented on FLINK-2341: I think this is an issue with the test. In general, I think that the asynchronous reader variants were premature in the first place (I did this). They add quite some complexity and their merit is unmeasured performance-wise. We will probably not recommend anyone to use this variant at the moment. I am thinking about whether it is better to just remove the async reader variants and take it up only if it becomes necessary. > Deadlock in SpilledSubpartitionViewAsyncIO > -- > > Key: FLINK-2341 > URL: https://issues.apache.org/jira/browse/FLINK-2341 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Affects Versions: 0.9, 0.10 >Reporter: Stephan Ewen >Assignee: Ufuk Celebi >Priority: Critical > Fix For: 0.9, 0.10 > > > It may be that the deadlock is because of the way the > {{SpilledSubpartitionViewTest}} is written > {code} > Found one Java-level deadlock: > = > "pool-25-thread-2": > waiting to lock monitor 0x7f66f4932468 (object 0xfa1478f0, a > java.lang.Object), > which is held by "IOManager reader thread #1" > "IOManager reader thread #1": > waiting to lock monitor 0x7f66f4931160 (object 0xfa029768, a > java.lang.Object), > which is held by "pool-25-thread-2" > Java stack information for the threads listed above: > === > "pool-25-thread-2": > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.notifyError(SpilledSubpartitionViewAsyncIO.java:304) > - waiting to lock <0xfa1478f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.onAvailableBuffer(SpilledSubpartitionViewAsyncIO.java:256) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$300(SpilledSubpartitionViewAsyncIO.java:42) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:367) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$BufferProviderCallback.onEvent(SpilledSubpartitionViewAsyncIO.java:353) > at > org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:135) > - locked <0xfa029768> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:119) > - locked <0xfa3a1a20> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:95) > at > org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer.call(TestSubpartitionConsumer.java:39) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:701) > "IOManager reader thread #1": > at > org.apache.flink.runtime.io.network.util.TestPooledBufferProvider$PooledBufferProviderRecycler.recycle(TestPooledBufferProvider.java:127) > - waiting to lock <0xfa029768> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.Buffer.recycle(Buffer.java:119) > - locked <0xfa3a1ea0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.returnBufferFromIOThread(SpilledSubpartitionViewAsyncIO.java:270) > - locked <0xfa1478f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO.access$100(SpilledSubpartitionViewAsyncIO.java:42) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:338) > at > org.apache.flink.runtime.io.network.partition.SpilledSubpartitionViewAsyncIO$IOThreadCallback.requestSuccessful(SpilledSubpartitionViewAsyncIO.java:328) > at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.handleProcessedBuffer(AsynchronousFileIOChannel.java:199) > at > org.apache.flink.runtime.io.disk.iomanager.BufferReadRequest.requestDone(AsynchronousFileIOChannel.java:431) > at > org.apache.flink.r
[jira] [Created] (FLINK-2396) Review the datasets of dynamic path and static path in iteration.
Chengxiang Li created FLINK-2396: Summary: Review the datasets of dynamic path and static path in iteration. Key: FLINK-2396 URL: https://issues.apache.org/jira/browse/FLINK-2396 Project: Flink Issue Type: Improvement Components: Core Reporter: Chengxiang Li Priority: Minor Currently Flink would cached dataset in static path as it assumes that dataset stay the same during the iteration, but this assumption does not always be true. Take sampling for example, the iteration data set is something like the weight vector of model and there is another training dataset from which to take a small sample to update the weight vector in each iteration (e.g. Stochastic Gradient Descent), we expect sampled dataset is different in each iteration, but Flink would cache the sampled dataset as it in static path. We should review how Flink identify dynamic path and static path, and support add sampled dataset in above example to dynamic path. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2395) Check Scala catch blocks for Throwable
[ https://issues.apache.org/jira/browse/FLINK-2395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638609#comment-14638609 ] Maximilian Michels commented on FLINK-2395: --- On the master, we currently have 72 {{Throwable}} catch blocks in the Scala code. Probably most of them can be safely converted to catch {{Exception}} instead. Some corner cases may remain, e.g. Hadoop uses {{FSError}} subclassed from Throwable to signalize file system errors but that should be handled by Java code. Then of course there are virtual machine errors like {{OutOfMemoryError}}. > Check Scala catch blocks for Throwable > -- > > Key: FLINK-2395 > URL: https://issues.apache.org/jira/browse/FLINK-2395 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Priority: Minor > > As described in [1], it's not a good practice to catch {{Throwables}} in > Scala catch blocks because Scala uses some special exceptions for the control > flow. Therefore we should check whether we can restrict ourselves to only > catching subtypes of {{Throwable}}, such as {{Exception}}, instead. > [1] > https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2213) Configure number of vcores
[ https://issues.apache.org/jira/browse/FLINK-2213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi updated FLINK-2213: --- Assignee: (was: Ufuk Celebi) > Configure number of vcores > -- > > Key: FLINK-2213 > URL: https://issues.apache.org/jira/browse/FLINK-2213 > Project: Flink > Issue Type: Improvement > Components: YARN Client >Affects Versions: master >Reporter: Ufuk Celebi > > Currently, the number of vcores per YARN container is set to 1. > It is desirable to allow configuring this value. As a simple heuristic it > makes sense to at least set it to the number of slots per container. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2395) Check Scala catch blocks for Throwable
[ https://issues.apache.org/jira/browse/FLINK-2395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638608#comment-14638608 ] Stephan Ewen commented on FLINK-2395: - At some points, catch {{Throwable}} is probably okay, such as at root points in the JobManager / TaskManager process initialization. > Check Scala catch blocks for Throwable > -- > > Key: FLINK-2395 > URL: https://issues.apache.org/jira/browse/FLINK-2395 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Priority: Minor > > As described in [1], it's not a good practice to catch {{Throwables}} in > Scala catch blocks because Scala uses some special exceptions for the control > flow. Therefore we should check whether we can restrict ourselves to only > catching subtypes of {{Throwable}}, such as {{Exception}}, instead. > [1] > https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-1863) RemoteInputChannelTest.testConcurrentOnBufferAndRelease fails on travis
[ https://issues.apache.org/jira/browse/FLINK-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi resolved FLINK-1863. Resolution: Cannot Reproduce > RemoteInputChannelTest.testConcurrentOnBufferAndRelease fails on travis > --- > > Key: FLINK-1863 > URL: https://issues.apache.org/jira/browse/FLINK-1863 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Ufuk Celebi > > {code} > testConcurrentOnBufferAndRelease(org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest) > Time elapsed: 120.022 sec <<< ERROR! > java.lang.Exception: test timed out after 12 milliseconds > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:838) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:248) > at java.util.concurrent.FutureTask.get(FutureTask.java:111) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.testConcurrentOnBufferAndRelease(RemoteInputChannelTest.java:124) > {code} > This is the build: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/57943450/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2395) Check Scala catch blocks for Throwable
[ https://issues.apache.org/jira/browse/FLINK-2395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638603#comment-14638603 ] Ufuk Celebi commented on FLINK-2395: Interesting link. Thanks for sharing! > Check Scala catch blocks for Throwable > -- > > Key: FLINK-2395 > URL: https://issues.apache.org/jira/browse/FLINK-2395 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Priority: Minor > > As described in [1], it's not a good practice to catch {{Throwables}} in > Scala catch blocks because Scala uses some special exceptions for the control > flow. Therefore we should check whether we can restrict ourselves to only > catching subtypes of {{Throwable}}, such as {{Exception}}, instead. > [1] > https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2332) Assign session IDs to JobManager and TaskManager messages
[ https://issues.apache.org/jira/browse/FLINK-2332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638589#comment-14638589 ] ASF GitHub Bot commented on FLINK-2332: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/917#issuecomment-124038143 Will rebase and then merge this PR. > Assign session IDs to JobManager and TaskManager messages > - > > Key: FLINK-2332 > URL: https://issues.apache.org/jira/browse/FLINK-2332 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 0.10 > > > In order to support true high availability {{TaskManager}} and {{JobManager}} > have to be able to distinguish whether a message was sent from the leader or > whether a message was sent from a former leader. Messages which come from a > former leader have to be discarded in order to guarantee a consistent state. > A way to do achieve this is to assign a leader session ID to a {{JobManager}} > once he's elected as leader. This leader session ID is sent to the > {{TaskManager}} upon registration at the {{JobManager}}. All subsequent > messages should then be decorated with this leader session ID to mark them as > valid. On the {{TaskManager}} side the received leader session ID as a > response to the registration message, can then be used to validate incoming > messages. > The same holds true for registration messages which should have a > registration session ID, too. That way, it is possible to distinguish invalid > registration messages from valid ones. The registration session ID can be > assigned once the TaskManager is notified about the new leader. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/917#issuecomment-124038143 Will rebase and then merge this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2395) Check Scala catch blocks for Throwable
Till Rohrmann created FLINK-2395: Summary: Check Scala catch blocks for Throwable Key: FLINK-2395 URL: https://issues.apache.org/jira/browse/FLINK-2395 Project: Flink Issue Type: Improvement Reporter: Till Rohrmann Priority: Minor As described in [1], it's not a good practice to catch {{Throwables}} in Scala catch blocks because Scala uses some special exceptions for the control flow. Therefore we should check whether we can restrict ourselves to only catching subtypes of {{Throwable}}, such as {{Exception}}, instead. [1] https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/ -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2373) Add configuration parameter to createRemoteEnvironment method
[ https://issues.apache.org/jira/browse/FLINK-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638525#comment-14638525 ] Andreas Kunft commented on FLINK-2373: -- I will > Add configuration parameter to createRemoteEnvironment method > - > > Key: FLINK-2373 > URL: https://issues.apache.org/jira/browse/FLINK-2373 > Project: Flink > Issue Type: Bug > Components: other >Reporter: Andreas Kunft >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Currently there is no way to provide a custom configuration upon creation of > a remote environment (via ExecutionEnvironment.createRemoteEnvironment(...)). > This leads to errors when the submitted job exceeds the default value for the > max. payload size in Akka, as we can not increase the configuration value > (akka.remote.OversizedPayloadException: Discarding oversized payload...) > Providing an overloaded method with a configuration parameter for the remote > environment fixes that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2205] Fix confusing entries in JM UI Jo...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/927#issuecomment-124019229 @ebautistabar you're the only one who can close the PR. But maybe the new web interface needs the same fixes as you've proposed for the old UI. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2205) Confusing entries in JM Webfrontend Job Configuration section
[ https://issues.apache.org/jira/browse/FLINK-2205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638458#comment-14638458 ] ASF GitHub Bot commented on FLINK-2205: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/927#issuecomment-124019229 @ebautistabar you're the only one who can close the PR. But maybe the new web interface needs the same fixes as you've proposed for the old UI. > Confusing entries in JM Webfrontend Job Configuration section > - > > Key: FLINK-2205 > URL: https://issues.apache.org/jira/browse/FLINK-2205 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 0.9 >Reporter: Fabian Hueske >Priority: Minor > Labels: starter > > The Job Configuration section of the job history / analyze page of the > JobManager webinterface contains two confusing entries: > - {{Number of execution retries}} is actually the maximum number of retries > and should be renamed accordingly. The default value is -1 and should be > changed to "deactivated" (or 0). > - {{Job parallelism}} which is -1 by default. A parallelism of -1 is not very > meaningful. It would be better to show something like "auto" -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638448#comment-14638448 ] Till Rohrmann commented on FLINK-1901: -- Currently, whats happening to decide whether an operator is on a dynamic path or not is to look at the inputs of the operator. If they are dynamic so is the current operator. The iteration {{DataSets}}, {{WorksetPlaceHolder}}, {{SolutionSetPlaceHolder}} and {{PartialSolutionPlaceHolder}}, are always dynamic. What could be an idea is to allow other operators also to be declared dynamic. That way they can also start dynamic path. Afterwards, we have to make sure that not only the iteration {{DataSets}} get a {{IterationHead}} prepended, which kicks off the iterations, but also all the other operators which start a dynamic path. > Create sample operator for Dataset > -- > > Key: FLINK-1901 > URL: https://issues.apache.org/jira/browse/FLINK-1901 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Theodore Vasiloudis >Assignee: Chengxiang Li > > In order to be able to implement Stochastic Gradient Descent and a number of > other machine learning algorithms we need to have a way to take a random > sample from a Dataset. > We need to be able to sample with or without replacement from the Dataset, > choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638429#comment-14638429 ] Till Rohrmann commented on FLINK-1901: -- That's a good idea to break down the task. Do you want to take the lead [~chengxiang li]? > Create sample operator for Dataset > -- > > Key: FLINK-1901 > URL: https://issues.apache.org/jira/browse/FLINK-1901 > Project: Flink > Issue Type: Improvement > Components: Core >Reporter: Theodore Vasiloudis >Assignee: Chengxiang Li > > In order to be able to implement Stochastic Gradient Descent and a number of > other machine learning algorithms we need to have a way to take a random > sample from a Dataset. > We need to be able to sample with or without replacement from the Dataset, > choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)