[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark
[ https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16028636#comment-16028636 ] Adam Szita commented on PIG-5207: - Patch committed to trunk, thanks for review Liyun and Rohini! > BugFix e2e tests fail on spark > -- > > Key: PIG-5207 > URL: https://issues.apache.org/jira/browse/PIG-5207 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: Adam Szita >Assignee: Adam Szita > Fix For: spark-branch > > Attachments: PIG-5207.0.patch, PIG-5207.1.patch > > > Observed ClassCastException in BugFix 1 and 2 test cases. The exception is > thrown from and UDF: COR.Final -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark
[ https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16028616#comment-16028616 ] Rohini Palaniswamy commented on PIG-5207: - +1 > BugFix e2e tests fail on spark > -- > > Key: PIG-5207 > URL: https://issues.apache.org/jira/browse/PIG-5207 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: Adam Szita >Assignee: Adam Szita > Fix For: spark-branch > > Attachments: PIG-5207.0.patch, PIG-5207.1.patch > > > Observed ClassCastException in BugFix 1 and 2 test cases. The exception is > thrown from and UDF: COR.Final -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark
[ https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16016999#comment-16016999 ] liyunzhang_intel commented on PIG-5207: --- [~rohini]: can you spend some time to view the modification of PhysicalPlan.java. > BugFix e2e tests fail on spark > -- > > Key: PIG-5207 > URL: https://issues.apache.org/jira/browse/PIG-5207 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: Adam Szita >Assignee: Adam Szita > Fix For: spark-branch > > Attachments: PIG-5207.0.patch, PIG-5207.1.patch > > > Observed ClassCastException in BugFix 1 and 2 test cases. The exception is > thrown from and UDF: COR.Final -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark
[ https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958153#comment-15958153 ] liyunzhang_intel commented on PIG-5207: --- [~szita]: +1 for PIG-5207.1.patch. [~rohini]: can you spend some time to help review the modification of PhysicalPlan.java? > BugFix e2e tests fail on spark > -- > > Key: PIG-5207 > URL: https://issues.apache.org/jira/browse/PIG-5207 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: Adam Szita >Assignee: Adam Szita > Fix For: spark-branch > > Attachments: PIG-5207.0.patch, PIG-5207.1.patch > > > Observed ClassCastException in BugFix 1 and 2 test cases. The exception is > thrown from and UDF: COR.Final -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark
[ https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15956840#comment-15956840 ] Adam Szita commented on PIG-5207: - [~kellyzly] we need an array of operators because otherwise we would get ConcurrectModificationException for removing items inside the loop if we iterate on a Collection. Also I uploaded a new patch and added this check: {{if (pplan.getPredecessors(originalUdfInput) != null)}} since trimAbove might fail if operator has no predecessors (I've seen this while running unit tests) > BugFix e2e tests fail on spark > -- > > Key: PIG-5207 > URL: https://issues.apache.org/jira/browse/PIG-5207 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: Adam Szita >Assignee: Adam Szita > Fix For: spark-branch > > Attachments: PIG-5207.0.patch, PIG-5207.1.patch > > > Observed ClassCastException in BugFix 1 and 2 test cases. The exception is > thrown from and UDF: COR.Final -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark
[ https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954304#comment-15954304 ] liyunzhang_intel commented on PIG-5207: --- [~szita]: 1 question about the modification in CombinerOptimizer.java. {code} for (PhysicalOperator originalUdfInput : pplan.getPredecessors(combineUdf).toArray(new PhysicalOperator[0])) { pplan.trimAbove(originalUdfInput); pplan.remove(originalUdfInput); } {code} do we really need to change List to Array ? {code}pplan.getPredecessors(combineUdf).toArray(new PhysicalOperator[0] )){code} > BugFix e2e tests fail on spark > -- > > Key: PIG-5207 > URL: https://issues.apache.org/jira/browse/PIG-5207 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: Adam Szita >Assignee: Adam Szita > Fix For: spark-branch > > Attachments: PIG-5207.0.patch > > > Observed ClassCastException in BugFix 1 and 2 test cases. The exception is > thrown from and UDF: COR.Final -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark
[ https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953426#comment-15953426 ] Adam Szita commented on PIG-5207: - [~rohini]: in our case the order of the edges in mToEdges (from one node to others) matters since the other nodes are POProject operators which all have a certain col set in them. That is mapped to the order that the user specifies upon using COR function like {{COR(var0col, var1col, var2col)}}. The order is held by the implementation which is basically an {{ArrayList}}. You can test this with the following use case which is similar to what we have in this E2E test: {code} PhysicalPlan plan = new PhysicalPlan(); //Creating ops PhysicalOperator proj0 = new POProject(new OperatorKey("scope",0),1,1); PhysicalOperator proj1 = new POProject(new OperatorKey("scope",1),1,0); PhysicalOperator proj2 = new POProject(new OperatorKey("scope",2),1,1); PhysicalOperator proj3 = new POProject(new OperatorKey("scope",3),1,1); PhysicalOperator proj4 = new POProject(new OperatorKey("scope",4),1,1); PhysicalOperator proj5 = new POProject(new OperatorKey("scope",5),1,2); POUserFunc udfOp = new POUserFunc(new OperatorKey("scope",6), 1, Lists.newArrayList(proj1,proj3,proj5), new FuncSpec(COR.class.getCanonicalName())); //Adding and connecting ops plan.add(proj0); plan.add(proj1); plan.connect(proj0, proj1); plan.add(proj2); plan.add(proj3); plan.connect(proj2, proj3); plan.add(proj4); plan.add(proj5); plan.connect(proj4, proj5); plan.add(udfOp); plan.connect(proj1, udfOp); plan.connect(proj3, udfOp); plan.connect(proj5, udfOp); PhysicalPlan clonedPlan = plan.clone(); //mToEdges is protected... Field f = OperatorPlan.class.getDeclaredField("mToEdges"); f.setAccessible(true); MultiMap originalToEdgesMap = (MultiMap)f.get(plan); MultiMap clonedToEdgesMap = (MultiMap)f.get(clonedPlan); System.out.println("Original column order"); for (Object op : originalToEdgesMap.keySet()){ if (op instanceof POUserFunc) { for (Object entry : (List)(originalToEdgesMap.get(op))){ System.out.println(((POProject)entry).getColumn()); } } } System.out.println("Cloned column order"); for (Object op : clonedToEdgesMap.keySet()){ if (op instanceof POUserFunc) { for (Object entry : (List)(clonedToEdgesMap.get(op))){ System.out.println(((POProject)entry).getColumn()); } } } {code} This gives me: {code} Original column order 0 1 2 Cloned column order 2 0 1 {code} The plan constructed in the example is: {code} POUserFunc(org.apache.pig.builtin.COR)[tuple] - scope-6 | |---Project[tuple][0] - scope-1 | | | |---Project[tuple][1] - scope-0 | |---Project[tuple][1] - scope-3 | | | |---Project[tuple][1] - scope-2 | |---Project[tuple][2] - scope-5 | |---Project[tuple][1] - scope-4 {code} > BugFix e2e tests fail on spark > -- > > Key: PIG-5207 > URL: https://issues.apache.org/jira/browse/PIG-5207 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: Adam Szita >Assignee: Adam Szita > Fix For: spark-branch > > Attachments: PIG-5207.0.patch > > > Observed ClassCastException in BugFix 1 and 2 test cases. The exception is > thrown from and UDF: COR.Final -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark
[ https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15951729#comment-15951729 ] Rohini Palaniswamy commented on PIG-5207: - bq. This is due to another plan generation bug and it is actually observable in the plan above, that POUserFunc has Project 0,2,1 as its input (instead of 0,1,2). That would be a serious bug and we would have encountered that before as we do a lot of cloning with union. Why does edges come into play for inputs of POUserFunc? Isn't it handled by https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java#L268-L288 which clones the inputs in the exact order. > BugFix e2e tests fail on spark > -- > > Key: PIG-5207 > URL: https://issues.apache.org/jira/browse/PIG-5207 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: Adam Szita >Assignee: Adam Szita > Fix For: spark-branch > > Attachments: PIG-5207.0.patch > > > Observed ClassCastException in BugFix 1 and 2 test cases. The exception is > thrown from and UDF: COR.Final -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (PIG-5207) BugFix e2e tests fail on spark
[ https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950940#comment-15950940 ] Adam Szita commented on PIG-5207: - This test case uses COR UDF to calculate correlation between 3 vars. The cause of the issue is that when execution reaches the Final implementation of the COR UDF there are some junk bags in the input Tuple: {code} --input |--DefaultBag: {((943629.189954,19810.987,476680.0,52620.3574006,2.5723842E7)} |--DefaultBag: {((157499.53767599948,19810.987,52620.3574006,52620.3574006,503441.4849212208)} |--InternalCachedBag: |--0 = {BinSedesTuple@7900} "(157499.53767599948,19810.987,52620.3574006,52620.3574006,503441.4849212208)" |--1 = {Long@7901} "1" |--2 = {BinSedesTuple@7902} "(943629.189954,19810.987,476680.0,52620.3574006,2.5723842E7)" |--3 = {Long@7903} "1" |--4 = {BinSedesTuple@7904} "(2509050.849514,52620.3574006,476680.0,503441.4849212208,2.5723842E7)" |--5 = {Long@7905} "1" {code} The real result to be consumed is at position 2, but of course since we expect 1 entry here the implementation queries for {{.get(0)}} and shortly after we will get a ClassCastException. *This is because of a fault in the CombinerOptimizer of Spark, it doesn't remove all original input projections in POUserFunc.Final part* as seen [here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L299]: {{PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0);}} //only the first input (this is why we didn't see this issue when using for example the AVG UDF) As seen in the plan: {code} Spark node scope-38 D: Store(file:/tmp/temp-1343607396/tmp944122059:org.apache.pig.impl.io.InterStorage) - scope-37 | |---D: New For Each(false,true)[tuple] - scope-47 (postReduceFE) | | | Project[chararray][0] - scope-39 | | | Project[bag][1] - scope-43 | | | |---Project[bag][1] - scope-42 | | POUserFunc(org.apache.pig.builtin.COR$Final)[bag] - scope-46 | | | |---Project[bag][0] - scope-41 << residual unneeded projection, remains from the original UDF's input | | | | | |---Project[bag][1] - scope-40 | | | |---Project[bag][2] - scope-45 << residual unneeded projection, remains from the original UDF's input | | | | | |---Project[bag][1] - scope-44 | | | |---Project[bag][1] - scope-67 << actual subresult comes in here | |---C: Reduce By(false,false)[tuple] - scope-57 (cfe) | | | Project[chararray][0] - scope-58 {code} After fixing this I saw that although now I get good results, the order of the results is off. The actual correlation values between var0-var1, etc.. are shifted with respect to MR's output. This is due to another plan generation bug and it is actually observable in the plan above, that {{POUserFunc}} has Project {{0,2,1}} as its input (instead of {{0,1,2}}). The root of this problem is the cloning of the ForEach operator [here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L141]. Cloning this will trigger cloning the associated PhysicalPlan instance and unfortunately [that method|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java#L217] has a bug: It doesn't keep the order of the {{List}} lists in the {{mToEdges}} because it is only considering {{mFromEdges}} to connect the cloned Operator instances. Keeping the ordering cannot be achieved by looking either only at {{mToEdges}} or only at {{mFromEdges}} since both operate with lists. As a fix to this I'm sorting these lists in the cloned plan according to what was in the original plan (by the use of the {{matches}} map that maps original and cloned ops. This doesn't come up in MR mode because the ForEach op is not cloned there but rather modified in-place during combiner optimization. We could do the same in Spark too but I feel this should rather be fixed in the common code for future clone() convenience. [~kellyzly] can you review the spark parts in CombinerOptimizer.java please? [~rohini] can you please review the common parts in PhysicalPlan.java? Please find the fix in my patch: [^PIG-5207.0.patch] > BugFix e2e tests fail on spark > -- > > Key: PIG-5207 > URL: https://issues.apache.org/jira/browse/PIG-5207 > Project: Pig > Issue Type: Sub-task > Components: spark >Reporter: Adam Szita >Assignee: Adam Szita > Fix For: spark-branch > > Attachments: PIG-5207.0.patch > > > Observed