[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark

2017-05-29 Thread Adam Szita (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16028636#comment-16028636
 ] 

Adam Szita commented on PIG-5207:
-

Patch committed to trunk, thanks for review Liyun and Rohini!

> BugFix e2e tests fail on spark
> --
>
> Key: PIG-5207
> URL: https://issues.apache.org/jira/browse/PIG-5207
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Adam Szita
>Assignee: Adam Szita
> Fix For: spark-branch
>
> Attachments: PIG-5207.0.patch, PIG-5207.1.patch
>
>
> Observed ClassCastException in BugFix 1 and 2 test cases. The exception is 
> thrown from and UDF: COR.Final



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark

2017-05-29 Thread Rohini Palaniswamy (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16028616#comment-16028616
 ] 

Rohini Palaniswamy commented on PIG-5207:
-

+1

> BugFix e2e tests fail on spark
> --
>
> Key: PIG-5207
> URL: https://issues.apache.org/jira/browse/PIG-5207
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Adam Szita
>Assignee: Adam Szita
> Fix For: spark-branch
>
> Attachments: PIG-5207.0.patch, PIG-5207.1.patch
>
>
> Observed ClassCastException in BugFix 1 and 2 test cases. The exception is 
> thrown from and UDF: COR.Final



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark

2017-05-19 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16016999#comment-16016999
 ] 

liyunzhang_intel commented on PIG-5207:
---

[~rohini]: can you spend some time to view the modification of 
PhysicalPlan.java.

> BugFix e2e tests fail on spark
> --
>
> Key: PIG-5207
> URL: https://issues.apache.org/jira/browse/PIG-5207
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Adam Szita
>Assignee: Adam Szita
> Fix For: spark-branch
>
> Attachments: PIG-5207.0.patch, PIG-5207.1.patch
>
>
> Observed ClassCastException in BugFix 1 and 2 test cases. The exception is 
> thrown from and UDF: COR.Final



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark

2017-04-05 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958153#comment-15958153
 ] 

liyunzhang_intel commented on PIG-5207:
---

[~szita]: +1 for PIG-5207.1.patch. [~rohini]: can you spend some time to help 
review the modification of PhysicalPlan.java?

> BugFix e2e tests fail on spark
> --
>
> Key: PIG-5207
> URL: https://issues.apache.org/jira/browse/PIG-5207
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Adam Szita
>Assignee: Adam Szita
> Fix For: spark-branch
>
> Attachments: PIG-5207.0.patch, PIG-5207.1.patch
>
>
> Observed ClassCastException in BugFix 1 and 2 test cases. The exception is 
> thrown from and UDF: COR.Final



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark

2017-04-05 Thread Adam Szita (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15956840#comment-15956840
 ] 

Adam Szita commented on PIG-5207:
-

[~kellyzly] we need an array of operators because otherwise we would get 
ConcurrectModificationException for removing items inside the loop if we 
iterate on a Collection.
Also I uploaded a new patch and added this check: {{if 
(pplan.getPredecessors(originalUdfInput) != null)}} since trimAbove might fail 
if operator has no predecessors (I've seen this while running unit tests)

> BugFix e2e tests fail on spark
> --
>
> Key: PIG-5207
> URL: https://issues.apache.org/jira/browse/PIG-5207
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Adam Szita
>Assignee: Adam Szita
> Fix For: spark-branch
>
> Attachments: PIG-5207.0.patch, PIG-5207.1.patch
>
>
> Observed ClassCastException in BugFix 1 and 2 test cases. The exception is 
> thrown from and UDF: COR.Final



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark

2017-04-03 Thread liyunzhang_intel (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954304#comment-15954304
 ] 

liyunzhang_intel commented on PIG-5207:
---

[~szita]: 1 question about the modification in CombinerOptimizer.java.  
{code}
for (PhysicalOperator originalUdfInput : 
pplan.getPredecessors(combineUdf).toArray(new PhysicalOperator[0])) {
pplan.trimAbove(originalUdfInput);
pplan.remove(originalUdfInput);
}
{code}

do we really need to change List to Array ?
{code}pplan.getPredecessors(combineUdf).toArray(new PhysicalOperator[0] )){code}

> BugFix e2e tests fail on spark
> --
>
> Key: PIG-5207
> URL: https://issues.apache.org/jira/browse/PIG-5207
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Adam Szita
>Assignee: Adam Szita
> Fix For: spark-branch
>
> Attachments: PIG-5207.0.patch
>
>
> Observed ClassCastException in BugFix 1 and 2 test cases. The exception is 
> thrown from and UDF: COR.Final



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark

2017-04-03 Thread Adam Szita (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953426#comment-15953426
 ] 

Adam Szita commented on PIG-5207:
-

[~rohini]: in our case the order of the edges in mToEdges (from one node to 
others) matters since the other nodes are POProject operators which all have a 
certain col set in them. That is mapped to the order that the user specifies 
upon using COR function like {{COR(var0col, var1col, var2col)}}. The order is 
held by the implementation which is basically an 
{{ArrayList}}.

You can test this with the following use case which is similar to what we have 
in this E2E test:
{code}
PhysicalPlan plan = new PhysicalPlan();
//Creating ops
PhysicalOperator proj0 = new POProject(new OperatorKey("scope",0),1,1);
PhysicalOperator proj1 = new POProject(new OperatorKey("scope",1),1,0);

PhysicalOperator proj2 = new POProject(new OperatorKey("scope",2),1,1);
PhysicalOperator proj3 = new POProject(new OperatorKey("scope",3),1,1);

PhysicalOperator proj4 = new POProject(new OperatorKey("scope",4),1,1);
PhysicalOperator proj5 = new POProject(new OperatorKey("scope",5),1,2);

POUserFunc udfOp = new POUserFunc(new OperatorKey("scope",6), 1, 
Lists.newArrayList(proj1,proj3,proj5), new
FuncSpec(COR.class.getCanonicalName()));

//Adding and connecting ops
plan.add(proj0);
plan.add(proj1);
plan.connect(proj0, proj1);

plan.add(proj2);
plan.add(proj3);
plan.connect(proj2, proj3);

plan.add(proj4);
plan.add(proj5);
plan.connect(proj4, proj5);

plan.add(udfOp);
plan.connect(proj1, udfOp);
plan.connect(proj3, udfOp);
plan.connect(proj5, udfOp);

PhysicalPlan clonedPlan = plan.clone();

//mToEdges is protected...
Field f = OperatorPlan.class.getDeclaredField("mToEdges");
f.setAccessible(true);
MultiMap originalToEdgesMap = (MultiMap)f.get(plan);
MultiMap clonedToEdgesMap = (MultiMap)f.get(clonedPlan);

System.out.println("Original column order");
for (Object op : originalToEdgesMap.keySet()){
  if (op instanceof POUserFunc) {
for (Object entry : (List)(originalToEdgesMap.get(op))){
  System.out.println(((POProject)entry).getColumn());
}
  }
}
System.out.println("Cloned column order");
for (Object op : clonedToEdgesMap.keySet()){
  if (op instanceof POUserFunc) {
for (Object entry : (List)(clonedToEdgesMap.get(op))){
  System.out.println(((POProject)entry).getColumn());
}
  }
}
{code}

This gives me:
{code}
Original column order
0
1
2
Cloned column order
2
0
1
{code}
The plan constructed in the example is:
{code}
POUserFunc(org.apache.pig.builtin.COR)[tuple] - scope-6
|
|---Project[tuple][0] - scope-1
|   |
|   |---Project[tuple][1] - scope-0
|
|---Project[tuple][1] - scope-3
|   |
|   |---Project[tuple][1] - scope-2
|
|---Project[tuple][2] - scope-5
|
|---Project[tuple][1] - scope-4
{code}

> BugFix e2e tests fail on spark
> --
>
> Key: PIG-5207
> URL: https://issues.apache.org/jira/browse/PIG-5207
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Adam Szita
>Assignee: Adam Szita
> Fix For: spark-branch
>
> Attachments: PIG-5207.0.patch
>
>
> Observed ClassCastException in BugFix 1 and 2 test cases. The exception is 
> thrown from and UDF: COR.Final



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark

2017-03-31 Thread Rohini Palaniswamy (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15951729#comment-15951729
 ] 

Rohini Palaniswamy commented on PIG-5207:
-

bq. This is due to another plan generation bug and it is actually observable in 
the plan above, that POUserFunc has Project 0,2,1 as its input (instead of 
0,1,2).
   That would be a serious bug and we would have encountered that before as we 
do a lot of cloning with union. Why does edges come into play for inputs of 
POUserFunc? Isn't it handled by 
https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java#L268-L288
 which clones the inputs in the exact order.

> BugFix e2e tests fail on spark
> --
>
> Key: PIG-5207
> URL: https://issues.apache.org/jira/browse/PIG-5207
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Adam Szita
>Assignee: Adam Szita
> Fix For: spark-branch
>
> Attachments: PIG-5207.0.patch
>
>
> Observed ClassCastException in BugFix 1 and 2 test cases. The exception is 
> thrown from and UDF: COR.Final



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark

2017-03-31 Thread Adam Szita (JIRA)

[ 
https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950940#comment-15950940
 ] 

Adam Szita commented on PIG-5207:
-

This test case uses COR UDF to calculate correlation between 3 vars.
The cause of the issue is that when execution reaches the Final implementation 
of the COR UDF there are some junk bags in the input Tuple:
{code}
--input
  |--DefaultBag: 
{((943629.189954,19810.987,476680.0,52620.3574006,2.5723842E7)}
  |--DefaultBag: 
{((157499.53767599948,19810.987,52620.3574006,52620.3574006,503441.4849212208)}
  |--InternalCachedBag: 
 |--0 = {BinSedesTuple@7900} 
"(157499.53767599948,19810.987,52620.3574006,52620.3574006,503441.4849212208)"
 |--1 = {Long@7901} "1"
 |--2 = {BinSedesTuple@7902} 
"(943629.189954,19810.987,476680.0,52620.3574006,2.5723842E7)"
 |--3 = {Long@7903} "1"
 |--4 = {BinSedesTuple@7904} 
"(2509050.849514,52620.3574006,476680.0,503441.4849212208,2.5723842E7)"
 |--5 = {Long@7905} "1"
{code}

The real result to be consumed is at position 2, but of course since we expect 
1 entry here the implementation queries for {{.get(0)}} and shortly after we 
will get a ClassCastException.
*This is because of a fault in the CombinerOptimizer of Spark, it doesn't 
remove all original input projections in POUserFunc.Final part* as seen 
[here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L299]:
 {{PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0);}} 
//only the first input (this is why we didn't see this issue when using for 
example the AVG UDF)
As seen in the plan:
{code}
Spark node scope-38
D: 
Store(file:/tmp/temp-1343607396/tmp944122059:org.apache.pig.impl.io.InterStorage)
 - scope-37
|
|---D: New For Each(false,true)[tuple] - scope-47 (postReduceFE)
|   |
|   Project[chararray][0] - scope-39
|   |
|   Project[bag][1] - scope-43
|   |
|   |---Project[bag][1] - scope-42
|   
|   POUserFunc(org.apache.pig.builtin.COR$Final)[bag] - scope-46
|   |
|   |---Project[bag][0] - scope-41   << residual unneeded projection, 
remains from the original UDF's input
|   |   |
|   |   |---Project[bag][1] - scope-40
|   |
|   |---Project[bag][2] - scope-45   << residual unneeded projection, 
remains from the original UDF's input
|   |   |
|   |   |---Project[bag][1] - scope-44
|   |
|   |---Project[bag][1] - scope-67   << actual subresult comes in here
|
|---C: Reduce By(false,false)[tuple] - scope-57  (cfe)
|   |
|   Project[chararray][0] - scope-58
{code}

After fixing this I saw that although now I get good results, the order of the 
results is off. The actual correlation values between var0-var1, etc.. are 
shifted with respect to MR's output.
This is due to another plan generation bug and it is actually observable in the 
plan above, that {{POUserFunc}} has Project {{0,2,1}} as its input (instead of 
{{0,1,2}}).

The root of this problem is the cloning of the ForEach operator 
[here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L141].
 Cloning this will trigger cloning the associated PhysicalPlan instance and 
unfortunately [that 
method|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java#L217]
 has a bug:

It doesn't keep the order of the {{List}} lists in the 
{{mToEdges}} because it is only considering {{mFromEdges}} to connect the 
cloned Operator instances.
Keeping the ordering cannot be achieved by looking either only at {{mToEdges}} 
or only at {{mFromEdges}} since both operate with lists.

As a fix to this I'm sorting these lists in the cloned plan according to what 
was in the original plan (by the use of the {{matches}} map that maps original 
and cloned ops.

This doesn't come up in MR mode because the ForEach op is not cloned there but 
rather modified in-place during combiner optimization. We could do the same in 
Spark too but I feel this should rather be fixed in the common code for future 
clone() convenience. 

[~kellyzly] can you review the spark parts in CombinerOptimizer.java please?
[~rohini] can you please review the common parts in PhysicalPlan.java?

Please find the fix in my patch: [^PIG-5207.0.patch]

> BugFix e2e tests fail on spark
> --
>
> Key: PIG-5207
> URL: https://issues.apache.org/jira/browse/PIG-5207
> Project: Pig
>  Issue Type: Sub-task
>  Components: spark
>Reporter: Adam Szita
>Assignee: Adam Szita
> Fix For: spark-branch
>
> Attachments: PIG-5207.0.patch
>
>
> Observed