Author: xuefu
Date: Thu Feb 18 02:43:40 2016
New Revision: 1730995

URL: http://svn.apache.org/viewvc?rev=1730995&view=rev
Log:
PIG-4601: Implement Merge CoGroup for Spark engine (Liyun via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
    pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1730995&r1=1730994&r2=1730995&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Thu Feb 18 02:43:40 2016
@@ -55,6 +55,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
@@ -76,6 +77,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeCogroupConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeJoinConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
@@ -202,6 +204,7 @@ public class SparkLauncher extends Launc
         convertMap.put(PORank.class, new RankConverter());
         convertMap.put(POStream.class, new StreamConverter(confBytes));
         convertMap.put(POFRJoin.class, new FRJoinConverter());
+        convertMap.put(POMergeCogroup.class, new MergeCogroupConverter());
         convertMap.put(POReduceBySpark.class, new ReduceByConverter());
         convertMap.put(POPreCombinerLocalRearrange.class, new 
LocalRearrangeConverter());
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1730995&r1=1730994&r2=1730995&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 Thu Feb 18 02:43:40 2016
@@ -18,29 +18,38 @@
 package org.apache.pig.backend.hadoop.executionengine.spark;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
-import java.util.UUID;
+
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
 
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.Partitioner;
 import org.apache.spark.rdd.RDD;
 
-import scala.Product2;
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
-
 public class SparkUtil {
 
     public static <T> ClassTag<T> getManifest(Class<T> clazz) {
@@ -119,4 +128,25 @@ public class SparkUtil {
             return new MapReducePartitionerWrapper(customPartitioner, 
parallelism);
         }
     }
-}
+
+    // createIndexerSparkNode is a utility to create an indexer spark node 
with baseSparkOp
+    static public void createIndexerSparkNode(SparkOperator baseSparkOp, 
String scope, NodeIdGenerator nig) throws PlanException, ExecException {
+        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+        PhysicalPlan ep = new PhysicalPlan();
+        POProject prj = new POProject(new OperatorKey(scope,
+                nig.getNextNodeId(scope)));
+        prj.setStar(true);
+        prj.setOverloaded(false);
+        prj.setResultType(DataType.TUPLE);
+        ep.add(prj);
+        eps.add(ep);
+
+        List<Boolean> ascCol = new ArrayList<Boolean>();
+        ascCol.add(true);
+
+        int requestedParallelism = baseSparkOp.requestedParallelism;
+        POSort sort = new POSort(new OperatorKey(scope, 
nig.getNextNodeId(scope)), requestedParallelism, null, eps, ascCol, null);
+        //POSort is added to sort the index tuples genereated by 
MergeJoinIndexer.More detail, see PIG-4601
+        baseSparkOp.physicalPlan.addAsLeaf(sort);
+    }
+}
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1730995&r1=1730994&r2=1730995&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 Thu Feb 18 02:43:40 2016
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.spark.plan;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -30,10 +31,16 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.CollectableLoadFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -47,6 +54,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -60,11 +68,11 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
-import org.apache.pig.builtin.LOG;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -73,6 +81,8 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Utils;
 
 /**
  * The compiler that compiles a given physical physicalPlan into a DAG of Spark
@@ -801,4 +811,173 @@ public class SparkCompiler extends PhyPl
                        finPlan.merge(e);
                }
        }
+
+    @Override
+    public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws 
VisitorException {
+        if (compiledInputs.length < 2) {
+            int errCode = 2251;
+            String errMsg = "Merge Cogroup work on two or more relations." +
+                    "To use map-side group-by on single relation, use 
'collected' qualifier.";
+            throw new SparkCompilerException(errMsg, errCode);
+        }
+
+        List<FuncSpec> funcSpecs = new 
ArrayList<FuncSpec>(compiledInputs.length - 1);
+        List<String> fileSpecs = new ArrayList<String>(compiledInputs.length - 
1);
+        List<String> loaderSigns = new ArrayList<String>(compiledInputs.length 
- 1);
+
+        try {
+            poCoGrp.setEndOfRecordMark(POStatus.STATUS_NULL);
+
+            // Iterate through all the SparkOpererators, disconnect side 
SparkOperators from
+            // SparkOperator and collect all the information needed in 
different lists.
+
+            for (int i = 0; i < compiledInputs.length; i++) {
+                SparkOperator sparkOper = compiledInputs[i];
+                PhysicalPlan plan = sparkOper.physicalPlan;
+                if (plan.getRoots().size() != 1) {
+                    int errCode = 2171;
+                    String errMsg = "Expected one but found more then one root 
physical operator in physical plan.";
+                    throw new SparkCompilerException(errMsg, errCode, 
PigException.BUG);
+                }
+
+                PhysicalOperator rootPOOp = plan.getRoots().get(0);
+                if (!(rootPOOp instanceof POLoad)) {
+                    int errCode = 2172;
+                    String errMsg = "Expected physical operator at root to be 
POLoad. Found : " + rootPOOp.getClass().getCanonicalName();
+                    throw new SparkCompilerException(errMsg, errCode);
+                }
+
+                POLoad sideLoader = (POLoad) rootPOOp;
+                FileSpec loadFileSpec = sideLoader.getLFile();
+                FuncSpec funcSpec = loadFileSpec.getFuncSpec();
+                LoadFunc loadfunc = sideLoader.getLoadFunc();
+                if (i == 0) {
+
+                    if 
(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))) {
+                        int errCode = 2252;
+                        throw new SparkCompilerException("Base loader in 
Cogroup must implement CollectableLoadFunc.", errCode);
+                    }
+
+                    ((CollectableLoadFunc) 
loadfunc).ensureAllKeyInstancesInSameSplit();
+                    continue;
+                }
+                if 
(!(IndexableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))) {
+                    int errCode = 2253;
+                    throw new SparkCompilerException("Side loaders in cogroup 
must implement IndexableLoadFunc.", errCode);
+                }
+
+                funcSpecs.add(funcSpec);
+                fileSpecs.add(loadFileSpec.getFileName());
+                loaderSigns.add(sideLoader.getSignature());
+                sparkPlan.remove(sparkOper);
+            }
+
+            poCoGrp.setSideLoadFuncs(funcSpecs);
+            poCoGrp.setSideFileSpecs(fileSpecs);
+            poCoGrp.setLoaderSignatures(loaderSigns);
+
+            // Use spark operator of base relation for the cogroup operation.
+            SparkOperator baseSparkOp = 
phyToSparkOpMap.get(poCoGrp.getInputs().get(0));
+
+            // Create a spark operator to generate index file for tuples from 
leftmost relation
+            SparkOperator indexerSparkOp = getSparkOp();
+            FileSpec idxFileSpec = getIndexingJob(indexerSparkOp, baseSparkOp, 
poCoGrp.getLRInnerPlansOf(0));
+            poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
+            poCoGrp.setIndexFileName(idxFileSpec.getFileName());
+
+            baseSparkOp.physicalPlan.addAsLeaf(poCoGrp);
+            for (FuncSpec funcSpec : funcSpecs)
+                baseSparkOp.UDFs.add(funcSpec.toString());
+
+            sparkPlan.add(indexerSparkOp);
+            sparkPlan.connect(indexerSparkOp, baseSparkOp);
+            phyToSparkOpMap.put(poCoGrp, baseSparkOp);
+            curSparkOp = baseSparkOp;
+        } catch (ExecException e) {
+            throw new SparkCompilerException(e.getDetailedMessage(), 
e.getErrorCode(), e.getErrorSource(), e);
+        } catch (SparkCompilerException mrce) {
+            throw (mrce);
+        } catch (CloneNotSupportedException e) {
+            throw new SparkCompilerException(e);
+        } catch (PlanException e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " + 
poCoGrp.getClass().getCanonicalName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        } catch (IOException e) {
+            int errCode = 3000;
+            String errMsg = "IOException caught while compiling 
POMergeCoGroup";
+            throw new SparkCompilerException(errMsg, errCode, e);
+        }
+    }
+
+    // Sets up the indexing job for single-stage cogroups.
+    private FileSpec getIndexingJob(SparkOperator indexerSparkOp,
+                                    final SparkOperator baseSparkOp, final 
List<PhysicalPlan> mapperLRInnerPlans)
+            throws SparkCompilerException, PlanException, ExecException, 
IOException, CloneNotSupportedException {
+
+        // First replace loader with  MergeJoinIndexer.
+        PhysicalPlan baseMapPlan = baseSparkOp.physicalPlan;
+        POLoad baseLoader = (POLoad) baseMapPlan.getRoots().get(0);
+        FileSpec origLoaderFileSpec = baseLoader.getLFile();
+        FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec();
+        LoadFunc loadFunc = baseLoader.getLoadFunc();
+
+        if (!(OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))) {
+            int errCode = 1104;
+            String errMsg = "Base relation of merge-coGroup must implement " +
+                    "OrderedLoadFunc interface. The specified loader "
+                    + funcSpec + " doesn't implement it";
+            throw new SparkCompilerException(errMsg, errCode);
+        }
+
+        String[] indexerArgs = new String[6];
+        indexerArgs[0] = funcSpec.toString();
+        indexerArgs[1] = ObjectSerializer.serialize((Serializable) 
mapperLRInnerPlans);
+        indexerArgs[3] = baseLoader.getSignature();
+        indexerArgs[4] = baseLoader.getOperatorKey().scope;
+        indexerArgs[5] = Boolean.toString(false); // we care for nulls.
+
+        PhysicalPlan phyPlan;
+        if (baseMapPlan.getSuccessors(baseLoader) == null
+                || baseMapPlan.getSuccessors(baseLoader).isEmpty()) {
+            // Load-Load-Cogroup case.
+            phyPlan = null;
+        } else { // We got something. Yank it and set it as inner plan.
+            phyPlan = baseMapPlan.clone();
+            PhysicalOperator root = phyPlan.getRoots().get(0);
+            phyPlan.disconnect(root, phyPlan.getSuccessors(root).get(0));
+            phyPlan.remove(root);
+
+        }
+        indexerArgs[2] = ObjectSerializer.serialize(phyPlan);
+
+        POLoad idxJobLoader = getLoad();
+        idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
+                new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
+        indexerSparkOp.physicalPlan.add(idxJobLoader);
+        
indexerSparkOp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString());
+
+        // Loader of sparkOp will return a tuple of form -
+        // (key1, key2, .. , WritableComparable, splitIndex). See 
MergeJoinIndexer for details.
+        // Create a spark node to retrieve index file by MergeJoinIndexer
+        SparkUtil.createIndexerSparkNode(indexerSparkOp, scope, nig);
+
+        POStore st = getStore();
+        FileSpec strFile = getTempFileSpec();
+        st.setSFile(strFile);
+        indexerSparkOp.physicalPlan.addAsLeaf(st);
+
+        return strFile;
+    }
+
+    /**
+     * Returns a temporary DFS Path
+     *
+     * @return
+     * @throws IOException
+     */
+    private FileSpec getTempFileSpec() throws IOException {
+        return new 
FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
+                new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1730995&r1=1730994&r2=1730995&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
 Thu Feb 18 02:43:40 2016
@@ -22,13 +22,17 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
@@ -40,12 +44,18 @@ public class PigInputFormatSpark extends
                        InterruptedException {
         init();
         resetUDFContext();
-        RecordReader recordReader = super.createRecordReader(split, context);
         //PigSplit#conf is the default hadoop configuration, we need get the 
configuration
         //from context.getConfigration() to retrieve pig properties
         PigSplit pigSplit = (PigSplit) split;
-        pigSplit.setConf(context.getConfiguration());
-        return recordReader;
+        Configuration conf = context.getConfiguration();
+        pigSplit.setConf(conf);
+        //Set current splitIndex in 
PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX)
+        //which will be used in POMergeCogroup#setup
+        if (PigMapReduce.sJobContext == null) {
+            PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new 
JobID());
+        }
+        
PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX,
 pigSplit.getSplitIndex());
+        return super.createRecordReader(split, context);
     }
 
        private void resetUDFContext() {

Modified: pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java?rev=1730995&r1=1730994&r2=1730995&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java Thu Feb 
18 02:43:40 2016
@@ -311,12 +311,11 @@ public class TestMapSideCogroup {
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "," + INPUT_FILE4 
+ "' using "+ DummyCollectableLoader.class.getName() +"() as 
(c1:chararray,c2:int);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' using "+ 
DummyIndexableLoader.class.getName()   +"() as (c1:chararray,c2:int);");
 
-        DataBag dbMergeCogrp = BagFactory.getInstance().newDefaultBag();
+        List<Tuple> dbMergeCogrp = new ArrayList<Tuple>();
 
         pigServer.registerQuery("C = cogroup A by c1, B by c1 using 'merge';");
         Iterator<Tuple> iter = pigServer.openIterator("C");
 
-
         while(iter.hasNext()) {
             Tuple t = iter.next();
             dbMergeCogrp.add(t);
@@ -335,12 +334,29 @@ public class TestMapSideCogroup {
                 "(3,{(3,3),(3,2),(3,1)},{(3,1),(3,2),(3,3)})"
         };
 
-        assertEquals(9, dbMergeCogrp.size());
+        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(results);
+
+        //We need sort dbMergeCogrp because the result is different in 
sequence between spark and other mode when
+        //multiple files are loaded(LOAD INPUT_FILE1,INPUT_FILE4...)
+        for (Tuple t : dbMergeCogrp) {
+            Util.convertBagToSortedBag(t);
+        }
+        for (Tuple t : expected) {
+            Util.convertBagToSortedBag(t);
+        }
+
+        Collections.sort(dbMergeCogrp);
+        Collections.sort(expected);
+        assertEquals(dbMergeCogrp.size(), expected.size());
+
+        //Since TestMapSideCogroup.DummyIndexableLoader.getNext() does not
+        //apply schema for each input 
tuple,Util#checkQueryOutputsAfterSortRecursive fails to assert.
+        // The schema for C is 
(int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,int),(chararray,int),(chararray,int)}).
+        //But the schema for result "dbMergeCogrp" is 
(int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,chararray),(chararray,chararray),(chararray,chararray)})
         Iterator<Tuple> itr = dbMergeCogrp.iterator();
-        for(int i=0; i<9; i++){
-            assertEquals(itr.next().toString(), results[i]);   
+        for (int i = 0; i < dbMergeCogrp.size(); i++) {
+            assertEquals(itr.next().toString(), expected.get(i).toString());
         }
-        assertFalse(itr.hasNext());
     }
 
     @Test


Reply via email to