[jira] [Commented] (FLINK-1343) Branching Join Program Deadlocks
[ https://issues.apache.org/jira/browse/FLINK-1343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14315815#comment-14315815 ] Ufuk Celebi commented on FLINK-1343: I've just checked this with [#356|https://github.com/apache/flink/pulls/356] and the deadlock does not occur. I will finalize the PR after some further tests. Branching Join Program Deadlocks Key: FLINK-1343 URL: https://issues.apache.org/jira/browse/FLINK-1343 Project: Flink Issue Type: Bug Components: Optimizer Affects Versions: 0.8, 0.9 Reporter: Fabian Hueske Assignee: Fabian Hueske The following program which gets its data from a single non-parallel data source, branches two times, and joins the branches with two joins, deadlocks. {code:java} public class DeadlockProgram { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSetLong longs = env.generateSequence(0,100l).setParallelism(1); DataSetLong longs2 = env.generateSequence(0, 100l).setParallelism(1); DataSetTuple1Long longT1 = longs.map(new TupleWrapper()); DataSetTuple1Long longT2 = longT1.project(0); DataSetTuple1Long longT3 = longs.map(new TupleWrapper()); // deadlocks //DataSetTuple1Long longT3 = longs2.map(new TupleWrapper()); // works longT2.join(longT3).where(0).equalTo(0).projectFirst(0) .join(longT1).where(0).equalTo(0).projectFirst(0) .print(); env.execute(); } public static class TupleWrapper implements MapFunctionLong, Tuple1Long { @Override public Tuple1Long map(Long l) throws Exception { return new Tuple1Long(l); } }; } {code} If one of the branches reads its data from a second data source (see inline comment) or if the single data source uses the default parallelism, the program executes correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1343) Branching Join Program Deadlocks
[ https://issues.apache.org/jira/browse/FLINK-1343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14315846#comment-14315846 ] Fabian Hueske commented on FLINK-1343: -- Very nice! Does that mean we can get rid of the Deadlock detection logic in the optimizer (or at least disable the placement of tempBarriers)? Otherwise, we would temp the data twice, right? Branching Join Program Deadlocks Key: FLINK-1343 URL: https://issues.apache.org/jira/browse/FLINK-1343 Project: Flink Issue Type: Bug Components: Optimizer Affects Versions: 0.8, 0.9 Reporter: Fabian Hueske Assignee: Fabian Hueske The following program which gets its data from a single non-parallel data source, branches two times, and joins the branches with two joins, deadlocks. {code:java} public class DeadlockProgram { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSetLong longs = env.generateSequence(0,100l).setParallelism(1); DataSetLong longs2 = env.generateSequence(0, 100l).setParallelism(1); DataSetTuple1Long longT1 = longs.map(new TupleWrapper()); DataSetTuple1Long longT2 = longT1.project(0); DataSetTuple1Long longT3 = longs.map(new TupleWrapper()); // deadlocks //DataSetTuple1Long longT3 = longs2.map(new TupleWrapper()); // works longT2.join(longT3).where(0).equalTo(0).projectFirst(0) .join(longT1).where(0).equalTo(0).projectFirst(0) .print(); env.execute(); } public static class TupleWrapper implements MapFunctionLong, Tuple1Long { @Override public Tuple1Long map(Long l) throws Exception { return new Tuple1Long(l); } }; } {code} If one of the branches reads its data from a second data source (see inline comment) or if the single data source uses the default parallelism, the program executes correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1343) Branching Join Program Deadlocks
[ https://issues.apache.org/jira/browse/FLINK-1343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14314326#comment-14314326 ] Fabian Hueske commented on FLINK-1343: -- After looking into the execution plan of deadlocking program and getting stacktraces of the deadlocked execution I think I found the reason for this deadlock. The first join is executed as HybridHashJoin. Both sides are fed from the same data source. In order to prevent a deadlock the optimizer places a tempBarrier on the probe side input. However, the implementation of the join ({{MatchDriver}}) blocks until both input are available (lines 103/104: {{this.taskContext.getInput()}}) before it starts building the hashtable. This behavior causes the deadlock because the build side is not fully consumed (as assumed by the optimizer). Instead, the HashJoin waits for the probe side to be available from the tempBarrier and gets stuck because the build input is not consumed. Forcing the first join to be executed as MergeJoin fixes the deadlock. Branching Join Program Deadlocks Key: FLINK-1343 URL: https://issues.apache.org/jira/browse/FLINK-1343 Project: Flink Issue Type: Bug Components: Optimizer Affects Versions: 0.8, 0.9 Reporter: Fabian Hueske Assignee: Fabian Hueske The following program which gets its data from a single non-parallel data source, branches two times, and joins the branches with two joins, deadlocks. {code:java} public class DeadlockProgram { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSetLong longs = env.generateSequence(0,100l).setParallelism(1); DataSetLong longs2 = env.generateSequence(0, 100l).setParallelism(1); DataSetTuple1Long longT1 = longs.map(new TupleWrapper()); DataSetTuple1Long longT2 = longT1.project(0); DataSetTuple1Long longT3 = longs.map(new TupleWrapper()); // deadlocks //DataSetTuple1Long longT3 = longs2.map(new TupleWrapper()); // works longT2.join(longT3).where(0).equalTo(0).projectFirst(0) .join(longT1).where(0).equalTo(0).projectFirst(0) .print(); env.execute(); } public static class TupleWrapper implements MapFunctionLong, Tuple1Long { @Override public Tuple1Long map(Long l) throws Exception { return new Tuple1Long(l); } }; } {code} If one of the branches reads its data from a second data source (see inline comment) or if the single data source uses the default parallelism, the program executes correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332)