[jira] [Commented] (FLINK-1343) Branching Join Program Deadlocks

2015-02-11 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14315815#comment-14315815
 ] 

Ufuk Celebi commented on FLINK-1343:


I've just checked this with [#356|https://github.com/apache/flink/pulls/356] 
and the deadlock does not occur. I will finalize the PR after some further 
tests.

 Branching Join Program Deadlocks
 

 Key: FLINK-1343
 URL: https://issues.apache.org/jira/browse/FLINK-1343
 Project: Flink
  Issue Type: Bug
  Components: Optimizer
Affects Versions: 0.8, 0.9
Reporter: Fabian Hueske
Assignee: Fabian Hueske

 The following program which gets its data from a single non-parallel data 
 source, branches two times, and joins the branches with two joins, deadlocks.
 {code:java}
 public class DeadlockProgram {
 public static void main(String[] args) throws Exception {
 final ExecutionEnvironment env = 
 ExecutionEnvironment.getExecutionEnvironment();
 DataSetLong longs = 
 env.generateSequence(0,100l).setParallelism(1);
 DataSetLong longs2 = env.generateSequence(0, 
 100l).setParallelism(1);
 DataSetTuple1Long longT1 = longs.map(new TupleWrapper());
 DataSetTuple1Long longT2 = longT1.project(0);
 DataSetTuple1Long longT3 = longs.map(new TupleWrapper()); // 
 deadlocks
 //DataSetTuple1Long longT3 = longs2.map(new TupleWrapper()); // 
 works
 longT2.join(longT3).where(0).equalTo(0).projectFirst(0)
 .join(longT1).where(0).equalTo(0).projectFirst(0)
 .print();
 env.execute();
 }
 public static class TupleWrapper implements MapFunctionLong, 
 Tuple1Long {
 @Override
 public Tuple1Long map(Long l) throws Exception {
 return new Tuple1Long(l);
 }
 };
 }
 {code}
 If one of the branches reads its data from a second data source (see inline 
 comment) or if the single data source uses the default parallelism, the 
 program executes correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1343) Branching Join Program Deadlocks

2015-02-11 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14315846#comment-14315846
 ] 

Fabian Hueske commented on FLINK-1343:
--

Very nice!
Does that mean we can get rid of the Deadlock detection logic in the optimizer 
(or at least disable the placement of tempBarriers)?
Otherwise, we would temp the data twice, right? 

 Branching Join Program Deadlocks
 

 Key: FLINK-1343
 URL: https://issues.apache.org/jira/browse/FLINK-1343
 Project: Flink
  Issue Type: Bug
  Components: Optimizer
Affects Versions: 0.8, 0.9
Reporter: Fabian Hueske
Assignee: Fabian Hueske

 The following program which gets its data from a single non-parallel data 
 source, branches two times, and joins the branches with two joins, deadlocks.
 {code:java}
 public class DeadlockProgram {
 public static void main(String[] args) throws Exception {
 final ExecutionEnvironment env = 
 ExecutionEnvironment.getExecutionEnvironment();
 DataSetLong longs = 
 env.generateSequence(0,100l).setParallelism(1);
 DataSetLong longs2 = env.generateSequence(0, 
 100l).setParallelism(1);
 DataSetTuple1Long longT1 = longs.map(new TupleWrapper());
 DataSetTuple1Long longT2 = longT1.project(0);
 DataSetTuple1Long longT3 = longs.map(new TupleWrapper()); // 
 deadlocks
 //DataSetTuple1Long longT3 = longs2.map(new TupleWrapper()); // 
 works
 longT2.join(longT3).where(0).equalTo(0).projectFirst(0)
 .join(longT1).where(0).equalTo(0).projectFirst(0)
 .print();
 env.execute();
 }
 public static class TupleWrapper implements MapFunctionLong, 
 Tuple1Long {
 @Override
 public Tuple1Long map(Long l) throws Exception {
 return new Tuple1Long(l);
 }
 };
 }
 {code}
 If one of the branches reads its data from a second data source (see inline 
 comment) or if the single data source uses the default parallelism, the 
 program executes correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1343) Branching Join Program Deadlocks

2015-02-10 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14314326#comment-14314326
 ] 

Fabian Hueske commented on FLINK-1343:
--

After looking into the execution plan of deadlocking program and getting 
stacktraces of the deadlocked execution I think I found the reason for this 
deadlock.

The first join is executed as HybridHashJoin. Both sides are fed from the same 
data source. In order to prevent a deadlock the optimizer places a tempBarrier 
on the probe side input. However, the implementation of the join 
({{MatchDriver}}) blocks until both input are available (lines 103/104: 
{{this.taskContext.getInput()}}) before it starts building the hashtable. This 
behavior causes the deadlock because the build side is not fully consumed (as 
assumed by the optimizer). Instead, the HashJoin waits for the probe side to be 
available from the tempBarrier and gets stuck because the build input is not 
consumed.

Forcing the first join to be executed as MergeJoin fixes the deadlock.

 Branching Join Program Deadlocks
 

 Key: FLINK-1343
 URL: https://issues.apache.org/jira/browse/FLINK-1343
 Project: Flink
  Issue Type: Bug
  Components: Optimizer
Affects Versions: 0.8, 0.9
Reporter: Fabian Hueske
Assignee: Fabian Hueske

 The following program which gets its data from a single non-parallel data 
 source, branches two times, and joins the branches with two joins, deadlocks.
 {code:java}
 public class DeadlockProgram {
 public static void main(String[] args) throws Exception {
 final ExecutionEnvironment env = 
 ExecutionEnvironment.getExecutionEnvironment();
 DataSetLong longs = 
 env.generateSequence(0,100l).setParallelism(1);
 DataSetLong longs2 = env.generateSequence(0, 
 100l).setParallelism(1);
 DataSetTuple1Long longT1 = longs.map(new TupleWrapper());
 DataSetTuple1Long longT2 = longT1.project(0);
 DataSetTuple1Long longT3 = longs.map(new TupleWrapper()); // 
 deadlocks
 //DataSetTuple1Long longT3 = longs2.map(new TupleWrapper()); // 
 works
 longT2.join(longT3).where(0).equalTo(0).projectFirst(0)
 .join(longT1).where(0).equalTo(0).projectFirst(0)
 .print();
 env.execute();
 }
 public static class TupleWrapper implements MapFunctionLong, 
 Tuple1Long {
 @Override
 public Tuple1Long map(Long l) throws Exception {
 return new Tuple1Long(l);
 }
 };
 }
 {code}
 If one of the branches reads its data from a second data source (see inline 
 comment) or if the single data source uses the default parallelism, the 
 program executes correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)