spark git commit: [SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3

2018-03-07 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 1dd37ff3b -> 404f7e201


[SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3

This is a backport of #20598.

## What changes were proposed in this pull request?

Solved two bugs to enable stream-stream self joins.

### Incorrect analysis due to missing MultiInstanceRelation trait
Streaming leaf nodes did not extend MultiInstanceRelation, which is necessary 
for the catalyst analyzer to convert the self-join logical plan DAG into a tree 
(by creating new instances of the leaf relations). This was causing the error 
`Failure when resolving conflicting references in Join:` (see JIRA for details).

### Incorrect attribute rewrite when splicing batch plans in MicroBatchExecution
When splicing the source's batch plan into the streaming plan (by replacing the 
StreamingExecutionPlan), we were rewriting the attribute reference in the 
streaming plan with the new attribute references from the batch plan. This was 
incorrectly handling the scenario when multiple StreamingExecutionRelation 
point to the same source, and therefore eventually point to the same batch plan 
returned by the source. Here is an example query, and its corresponding plan 
transformations.
```
val df = input.toDF
val join =
  df.select('value % 5 as "key", 'value).join(
df.select('value % 5 as "key", 'value), "key")
```
Streaming logical plan before splicing the batch plan
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
  +- StreamingExecutionRelation Memory[#1], value#12  // two different 
leaves pointing to same source
```
Batch logical plan after splicing the batch plan and before rewriting
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- LocalRelation [value#66]   // replaces 
StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
  +- LocalRelation [value#66]   // replaces 
StreamingExecutionRelation Memory[#1], value#12
```
Batch logical plan after rewriting the attributes. Specifically, for spliced, 
the new output attributes (value#66) replace the earlier output attributes 
(value#12, and value#1, one for each StreamingExecutionRelation).
```
Project [key#6, value#66, value#66]   // both value#1 and value#12 replaces 
by value#66
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9, value#66]
  +- LocalRelation [value#66]
```
This causes the optimizer to eliminate value#66 from one side of the join.
```
Project [key#6, value#66, value#66]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9]   // this does not generate value, 
incorrect join results
  +- LocalRelation [value#66]
```

**Solution**: Instead of rewriting attributes, use a Project to introduce 
aliases between the output attribute references and the new reference generated 
by the spliced plans. The analyzer and optimizer will take care of the rest.
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- Project [value#66 AS value#1]   // solution: project with aliases
   : +- LocalRelation [value#66]
   +- Project [(value#12 % 5) AS key#9, value#12]
  +- Project [value#66 AS value#12]// solution: project with aliases
 +- LocalRelation [value#66]
```

## How was this patch tested?
New unit test

Author: Tathagata Das 

Closes #20765 from tdas/SPARK-23406-2.3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/404f7e20
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/404f7e20
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/404f7e20

Branch: refs/heads/branch-2.3
Commit: 404f7e2013ecfdf993a17fd942d8890d9a8100e7
Parents: 1dd37ff
Author: Tathagata Das 
Authored: Wed Mar 7 21:58:57 2018 -0800
Committer: Tathagata Das 
Committed: Wed Mar 7 21:58:57 2018 -0800

--
 .../streaming/MicroBatchExecution.scala | 16 ++---
 .../execution/streaming/StreamingRelation.scala | 20 +++-
 .../sql/streaming/StreamingJoinSuite.scala  | 25 +++-
 3 files changed, 45 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/404f7e20/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

spark git commit: [SPARK-23406][SS] Enable stream-stream self-joins

2018-02-14 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 400a1d9e2 -> 658d9d9d7


[SPARK-23406][SS] Enable stream-stream self-joins

## What changes were proposed in this pull request?

Solved two bugs to enable stream-stream self joins.

### Incorrect analysis due to missing MultiInstanceRelation trait
Streaming leaf nodes did not extend MultiInstanceRelation, which is necessary 
for the catalyst analyzer to convert the self-join logical plan DAG into a tree 
(by creating new instances of the leaf relations). This was causing the error 
`Failure when resolving conflicting references in Join:` (see JIRA for details).

### Incorrect attribute rewrite when splicing batch plans in MicroBatchExecution
When splicing the source's batch plan into the streaming plan (by replacing the 
StreamingExecutionPlan), we were rewriting the attribute reference in the 
streaming plan with the new attribute references from the batch plan. This was 
incorrectly handling the scenario when multiple StreamingExecutionRelation 
point to the same source, and therefore eventually point to the same batch plan 
returned by the source. Here is an example query, and its corresponding plan 
transformations.
```
val df = input.toDF
val join =
  df.select('value % 5 as "key", 'value).join(
df.select('value % 5 as "key", 'value), "key")
```
Streaming logical plan before splicing the batch plan
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
  +- StreamingExecutionRelation Memory[#1], value#12  // two different 
leaves pointing to same source
```
Batch logical plan after splicing the batch plan and before rewriting
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- LocalRelation [value#66]   // replaces 
StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
  +- LocalRelation [value#66]   // replaces 
StreamingExecutionRelation Memory[#1], value#12
```
Batch logical plan after rewriting the attributes. Specifically, for spliced, 
the new output attributes (value#66) replace the earlier output attributes 
(value#12, and value#1, one for each StreamingExecutionRelation).
```
Project [key#6, value#66, value#66]   // both value#1 and value#12 replaces 
by value#66
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9, value#66]
  +- LocalRelation [value#66]
```
This causes the optimizer to eliminate value#66 from one side of the join.
```
Project [key#6, value#66, value#66]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9]   // this does not generate value, 
incorrect join results
  +- LocalRelation [value#66]
```

**Solution**: Instead of rewriting attributes, use a Project to introduce 
aliases between the output attribute references and the new reference generated 
by the spliced plans. The analyzer and optimizer will take care of the rest.
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- Project [value#66 AS value#1]   // solution: project with aliases
   : +- LocalRelation [value#66]
   +- Project [(value#12 % 5) AS key#9, value#12]
  +- Project [value#66 AS value#12]// solution: project with aliases
 +- LocalRelation [value#66]
```

## How was this patch tested?
New unit test

Author: Tathagata Das 

Closes #20598 from tdas/SPARK-23406.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/658d9d9d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/658d9d9d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/658d9d9d

Branch: refs/heads/master
Commit: 658d9d9d785a30857bf35d164e6cbbd9799d6959
Parents: 400a1d9
Author: Tathagata Das 
Authored: Wed Feb 14 14:27:02 2018 -0800
Committer: Tathagata Das 
Committed: Wed Feb 14 14:27:02 2018 -0800

--
 .../streaming/MicroBatchExecution.scala | 16 ++---
 .../execution/streaming/StreamingRelation.scala | 20 +++-
 .../sql/streaming/StreamingJoinSuite.scala  | 25 +++-
 3 files changed, 45 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/658d9d9d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala