Added: pig/trunk/test/org/apache/pig/spark/TestIndexedKey.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/spark/TestIndexedKey.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/spark/TestIndexedKey.java (added)
+++ pig/trunk/test/org/apache/pig/spark/TestIndexedKey.java Mon May 29 15:00:39 
2017
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.spark;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.IndexedKey;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(JUnit4.class)
+public class TestIndexedKey {
+
+    /**Case1:Compare IndexedKeys with same index value
+     * key1    key2    equal?  hashCode1        hashCode2
+     * foo     null      N     hashCode(foo)    index
+     * null    foo       N     index            hashCode(foo)
+     * foo     foo       Y     hashCode(foo)    hashCode(foo)
+     * null    null      Y     index            index
+     * (1,1)   (1,1)     Y     hashCode((1,1))  hashCode((1,1))
+     * (1,)    (1,)      Y     hashCode((1,))   hashCode((1,))
+     * (1,1)   (1,2)     N     hashCode((1,1))  hashCode((1,2))
+     */
+    @Test
+    public void testIndexedKeyWithSameIndexValue() throws Exception {
+        IndexedKey a0 = new IndexedKey(new Byte("0"), "foo");
+        IndexedKey a1 = new IndexedKey(new Byte("0"), null);
+        assertEquals(a0.equals(a1), false);
+        assertEquals(a0.hashCode()==a1.hashCode(),false);
+
+        IndexedKey a2 = new IndexedKey(new Byte("0"), null);
+        IndexedKey a3 = new IndexedKey(new Byte("0"), "foo");
+        assertEquals(a2.equals(a3),false);
+        assertEquals(a2.hashCode()==a3.hashCode(),false);
+
+        IndexedKey a4 = new IndexedKey(new Byte("0"), "foo");
+        IndexedKey a5 = new IndexedKey(new Byte("0"), "foo");
+        assertEquals(a4.equals(a5),true);
+        assertEquals(a4.hashCode()==a5.hashCode(),true);
+
+        IndexedKey a6 = new IndexedKey(new Byte("0"), null);
+        IndexedKey a7 = new IndexedKey(new Byte("0"), null);
+        assertEquals(a6.equals(a7),true);
+        assertEquals(a6.hashCode()==a7.hashCode(),true);
+
+        Tuple t1 = TupleFactory.getInstance().newTuple(2);
+        t1.set(0,"1");
+        t1.set(1,"1");
+        Tuple t2 = TupleFactory.getInstance().newTuple(2);
+        t2.set(0,"1");
+        t2.set(1,"1");
+        IndexedKey a8 = new IndexedKey(new Byte("0"), t1);
+        IndexedKey a9 = new IndexedKey(new Byte("0"), t2);
+        assertEquals(a8.equals(a9),true);
+        assertEquals(a8.hashCode()==a9.hashCode(),true);
+
+        Tuple t3 = TupleFactory.getInstance().newTuple(2);
+        t3.set(0,"1");
+        t3.set(1,null);
+        Tuple t4 = TupleFactory.getInstance().newTuple(2);
+        t4.set(0,"1");
+        t4.set(1,null);
+        IndexedKey a10 = new IndexedKey(new Byte("0"), t3);
+        IndexedKey a11 = new IndexedKey(new Byte("0"), t4);
+        assertEquals(a10.equals(a11),true);
+        assertEquals(a10.hashCode()==a11.hashCode(),true);
+
+        Tuple t5 = TupleFactory.getInstance().newTuple(2);
+        t5.set(0,"1");
+        t5.set(1,"1");
+        Tuple t6 = TupleFactory.getInstance().newTuple(2);
+        t6.set(0,"1");
+        t6.set(1,"2");
+        IndexedKey a12 = new IndexedKey(new Byte("0"), t5);
+        IndexedKey a13 = new IndexedKey(new Byte("0"), t6);
+        assertEquals(a12.equals(a13),false);
+        assertEquals(a12.hashCode()==a13.hashCode(),false);
+    }
+
+    /*
+     * Case2:Compare IndexedKeys with different index value
+     * key1    key2    equal?  hashCode1        hashCode2
+     * foo     null     N      hashCode(foo)    index2
+     * null    foo      N      index1           hashCode(foo)
+     * foo     foo      Y      hashCode(foo)    hashCode(foo)
+     * null    null     N      index1           index2
+     * (1,1)   (1,1)    Y      hashCode((1,1))  hashCode((1,1))
+     * (1,)    (1,)     N      hashCode((1,))   hashCode((1,))
+     * (1,1)   (1,2)    N      hashCode((1,1))  hashCode((1,2))
+     */
+    @Test
+    public void testIndexedKeyWithDifferentIndexValue() throws Exception {
+        IndexedKey a0 = new IndexedKey(new Byte("0"), "foo");
+        IndexedKey a1 = new IndexedKey(new Byte("1"), null);
+        assertEquals(a0.equals(a1), false);
+        assertEquals(a0.hashCode() == a1.hashCode(), false);
+
+        IndexedKey a2 = new IndexedKey(new Byte("0"), null);
+        IndexedKey a3 = new IndexedKey(new Byte("1"), "foo");
+        assertEquals(a2.equals(a3), false);
+        assertEquals(a2.hashCode() == a3.hashCode(), false);
+
+        IndexedKey a4 = new IndexedKey(new Byte("0"), "foo");
+        IndexedKey a5 = new IndexedKey(new Byte("1"), "foo");
+        assertEquals(a4.equals(a5), true);
+        assertEquals(a4.hashCode() == a5.hashCode(), true);
+
+        IndexedKey a6 = new IndexedKey(new Byte("0"), null);
+        IndexedKey a7 = new IndexedKey(new Byte("1"), null);
+        assertEquals(a6.equals(a7), false);
+        assertEquals(a6.hashCode() == a7.hashCode(), false);
+
+        Tuple t1 = TupleFactory.getInstance().newTuple(2);
+        t1.set(0, "1");
+        t1.set(1, "1");
+        Tuple t2 = TupleFactory.getInstance().newTuple(2);
+        t2.set(0, "1");
+        t2.set(1, "1");
+        IndexedKey a8 = new IndexedKey(new Byte("0"), t1);
+        IndexedKey a9 = new IndexedKey(new Byte("1"), t2);
+        assertEquals(a8.equals(a9), true);
+        assertEquals(a8.hashCode() == a9.hashCode(), true);
+
+        Tuple t3 = TupleFactory.getInstance().newTuple(2);
+        t3.set(0, "1");
+        t3.set(1, null);
+        Tuple t4 = TupleFactory.getInstance().newTuple(2);
+        t4.set(0, "1");
+        t4.set(1, null);
+        IndexedKey a10 = new IndexedKey(new Byte("0"), t3);
+        IndexedKey a11 = new IndexedKey(new Byte("1"), t4);
+        assertEquals(a10.equals(a11), false);
+        assertEquals(a10.hashCode() == a11.hashCode(), true); //hashcode of 
a10 and a11 are equal but they are not equal
+
+        Tuple t5 = TupleFactory.getInstance().newTuple(2);
+        t5.set(0, "1");
+        t5.set(1, "1");
+        Tuple t6 = TupleFactory.getInstance().newTuple(2);
+        t6.set(0, "1");
+        t6.set(1, "2");
+        IndexedKey a12 = new IndexedKey(new Byte("0"), t5);
+        IndexedKey a13 = new IndexedKey(new Byte("1"), t6);
+        assertEquals(a12.equals(a13), false);
+        assertEquals(a12.hashCode() == a13.hashCode(), false);
+    }
+}

Added: pig/trunk/test/org/apache/pig/spark/TestSecondarySortSpark.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/spark/TestSecondarySortSpark.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/spark/TestSecondarySortSpark.java (added)
+++ pig/trunk/test/org/apache/pig/spark/TestSecondarySortSpark.java Mon May 29 
15:00:39 2017
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.spark;
+
+import 
org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.test.MiniGenericCluster;
+import org.apache.pig.test.TestSecondarySort;
+import org.apache.pig.test.Util;
+
+/**
+ * TestSecondarySortSpark.
+ */
+public class TestSecondarySortSpark extends TestSecondarySort {
+
+    public TestSecondarySortSpark() {
+        super();
+    }
+
+    @Override
+    public MiniGenericCluster getCluster() {
+        return 
MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_SPARK);
+    }
+
+    @Override
+    public SecondaryKeyOptimizer visitSecondaryKeyOptimizer(String query) 
throws Exception {
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+        SparkCompiler comp = new SparkCompiler(pp, pc);
+        comp.compile();
+        SparkOperPlan sparkPlan = comp.getSparkPlan();
+        SecondaryKeyOptimizerSpark optimizer = new 
SecondaryKeyOptimizerSpark(sparkPlan);
+        optimizer.visit();
+        return optimizer;
+    }
+}

Added: pig/trunk/test/org/apache/pig/spark/TestSparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/spark/TestSparkCompiler.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/spark/TestSparkCompiler.java (added)
+++ pig/trunk/test/org/apache/pig/spark/TestSparkCompiler.java Mon May 29 
15:00:39 2017
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.spark;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Properties;
+import java.util.Random;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.TransformerException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLocalExecType;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.DotSparkPrinter;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.plan.XMLSparkPrinter;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.Util;
+import org.apache.pig.test.utils.TestHelper;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test cases to test the SparkCompiler. VERY IMPORTANT NOTE: The tests here
+ * compare results with a "golden" set of outputs. In each test case here, the
+ * operators generated have a random operator key which uses Java's Random
+ * class. So if there is a code change which changes the number of operators
+ * created in a plan, then  the "golden" file for that test case
+ * need to be changed.
+ */
+
+public class TestSparkCompiler {
+    private static PigContext pc;
+    private static PigServer pigServer;
+    private static final int MAX_SIZE = 100000;
+
+    private enum PlanPrinter {
+        TEXT,
+        DOT,
+        XML;
+
+        public void doPrint(PrintStream ps, SparkOperPlan plan) throws 
VisitorException, ParserConfigurationException, TransformerException {
+            switch (this) {
+                case DOT:
+                    throw new RuntimeException("Testing in DOT format not 
supported yet");
+                    //(new DotSparkPrinter(plan, ps)).dump();
+                    //break;
+                case XML:
+                    XMLSparkPrinter printer = new XMLSparkPrinter(ps, plan);
+                    printer.visit();
+                    printer.closePlan();
+                    break;
+                case TEXT:
+                default:
+                    (new SparkPrinter(ps, plan)).visit();
+                    break;
+            }
+        }
+    }
+
+    // If for some reason, the golden files need to be regenerated, set this to
+    // true - THIS WILL OVERWRITE THE GOLDEN FILES - So use this with caution
+    // and only for the test cases you need and are sure of.
+    private boolean generate = false;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        resetFileLocalizer();
+        pc = new PigContext(new SparkLocalExecType(), new Properties());
+        FileUtils.deleteDirectory(new File("/tmp/pigoutput"));
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        resetFileLocalizer();
+    }
+
+    @Before
+    public void setUp() throws ExecException {
+        resetScope();
+        pigServer = new PigServer(pc);
+    }
+
+    private void resetScope() {
+        NodeIdGenerator.reset();
+        PigServer.resetScope();
+    }
+
+    private static void resetFileLocalizer() {
+        FileLocalizer.deleteTempFiles();
+        FileLocalizer.setInitialized(false);
+        // Set random seed to generate deterministic temporary paths
+        FileLocalizer.setR(new Random(1331L));
+    }
+
+    @Test
+    public void testStoreLoad() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' as (x:int, y:int);" +
+                "store a into 'file:///tmp/pigoutput';" +
+                "b = load 'file:///tmp/pigoutput' as (x:int, y:int);" +
+                "store b into 'file:///tmp/pigoutput1';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld", 
PlanPrinter.TEXT);
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld", 
PlanPrinter.XML);
+        //TODO: enable this when DOT file comparison is supported
+        //run(query, 
"test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld", 
PlanPrinter.DOT);
+    }
+
+    private void run(String query, String expectedFile, PlanPrinter 
planPrinter) throws Exception {
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+        SparkLauncher launcher = new SparkLauncher();
+        pc.inExplain = true;
+        SparkOperPlan sparkOperPlan = launcher.compile(pp, pc);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        planPrinter.doPrint(ps, sparkOperPlan);
+        String compiledPlan = baos.toString();
+        System.out.println();
+        System.out.println("<<<" + compiledPlan + ">>>");
+
+        if (generate) {
+            FileOutputStream fos = new FileOutputStream(expectedFile);
+            fos.write(baos.toByteArray());
+            fos.close();
+            return;
+        }
+        FileInputStream fis = new FileInputStream(expectedFile);
+        byte[] b = new byte[MAX_SIZE];
+        int len = fis.read(b);
+        fis.close();
+        String goldenPlan = new String(b, 0, len);
+        if (goldenPlan.charAt(len-1) == '\n') {
+            goldenPlan = goldenPlan.substring(0, len-1);
+        }
+
+        System.out.println("-------------");
+        System.out.println("Golden");
+        System.out.println("<<<" + goldenPlan + ">>>");
+        System.out.println("-------------");
+
+        String goldenPlanClean = Util.standardizeNewline(goldenPlan).trim();
+        String compiledPlanClean = 
Util.standardizeNewline(compiledPlan).trim();
+        
assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)),
+                TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean)));
+    }
+
+}
+

Modified: pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java (original)
+++ pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java Mon May 29 
15:00:39 2017
@@ -48,6 +48,7 @@ abstract public class MiniGenericCluster
 
     public static String EXECTYPE_MR = "mr";
     public static String EXECTYPE_TEZ = "tez";
+    public static String EXECTYPE_SPARK = "spark";
 
     /**
      * Returns the single instance of class MiniGenericCluster that represents
@@ -75,6 +76,8 @@ abstract public class MiniGenericCluster
                 INSTANCE = new MiniCluster();
             } else if (execType.equalsIgnoreCase(EXECTYPE_TEZ)) {
                 INSTANCE = new TezMiniCluster();
+            } else if (execType.equalsIgnoreCase(EXECTYPE_SPARK)) {
+                INSTANCE = new SparkMiniCluster();
             } else {
                 throw new RuntimeException("Unknown test.exec.type: " + 
execType);
             }
@@ -157,7 +160,9 @@ abstract public class MiniGenericCluster
             return MiniCluster.getLauncher();
         } else if (execType.equalsIgnoreCase(EXECTYPE_TEZ)) {
             return TezMiniCluster.getLauncher();
-        } else {
+        } else if(execType.equalsIgnoreCase(EXECTYPE_SPARK)){
+            return SparkMiniCluster.getLauncher();
+        } else{
             throw new RuntimeException("Unknown test.exec.type: " + execType);
         }
     }

Added: pig/trunk/test/org/apache/pig/test/SparkMiniCluster.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/SparkMiniCluster.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/SparkMiniCluster.java (added)
+++ pig/trunk/test/org/apache/pig/test/SparkMiniCluster.java Mon May 29 
15:00:39 2017
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
+
+public class SparkMiniCluster extends YarnMiniCluster {
+
+    private static final Log LOG = LogFactory
+            .getLog(SparkMiniCluster.class);
+    private ExecType spark = new SparkExecType();
+
+    @Override
+    public ExecType getExecType() {
+        return spark;
+    }
+
+    static public Launcher getLauncher() {
+        return new SparkLauncher();
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/TestAssert.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAssert.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAssert.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAssert.java Mon May 29 15:00:39 2017
@@ -116,7 +116,8 @@ public class TestAssert {
       try {
           pigServer.openIterator("A");
       } catch (FrontendException fe) {
-          if 
(pigServer.getPigContext().getExecType().toString().startsWith("TEZ")) {
+          if 
(pigServer.getPigContext().getExecType().toString().startsWith("TEZ")
+                  || 
pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) {
               Assert.assertTrue(fe.getCause().getMessage().contains(
                       "Assertion violated: i should be greater than 1"));
           } else {
@@ -147,7 +148,8 @@ public class TestAssert {
       try {
           pigServer.openIterator("A");
       } catch (FrontendException fe) {
-          if 
(pigServer.getPigContext().getExecType().toString().startsWith("TEZ")) {
+          if 
(pigServer.getPigContext().getExecType().toString().startsWith("TEZ")
+                  || 
pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) {
               Assert.assertTrue(fe.getCause().getMessage().contains(
                       "Assertion violated: i should be greater than 1"));
           } else {

Modified: pig/trunk/test/org/apache/pig/test/TestCase.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCase.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCase.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCase.java Mon May 29 15:00:39 2017
@@ -29,6 +29,7 @@ import org.apache.pig.PigServer;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.junit.Test;
 
 public class TestCase {
@@ -267,10 +268,15 @@ public class TestCase {
         pigServer.registerQuery("STORE C INTO 'bar' USING mock.Storage();");
 
         List<Tuple> out = data.get("bar");
-        assertEquals(3, out.size());
-        assertEquals(tuple(1, "3n+1", bag(tuple("a","x"), tuple("a","y"))), 
out.get(0));
-        assertEquals(tuple(2, "3n+2", bag(tuple("b","x"), tuple("b","y"))), 
out.get(1));
-        assertEquals(tuple(3, "3n",   bag(tuple("c","x"), tuple("c","y"))), 
out.get(2));
+
+        String[] expected = new String[] {
+                "(1,3n+1,{(a,x),(a,y)})",
+                "(2,3n+2,{(b,x),(b,y)})",
+                "(3,3n,{(c,x),(c,y)})"
+        };
+        Schema s = pigServer.dumpSchema("C");
+        Util.checkQueryOutputs(out.iterator(), expected, 
org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(Util.getLocalTestMode()));
     }
 
     /**

Modified: pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java Mon May 29 
15:00:39 2017
@@ -25,7 +25,6 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.CollectableLoadFunc;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
@@ -296,13 +295,13 @@ public class TestCollectedGroup {
     @Test
     public void testMapsideGroupWithMergeJoin() throws IOException{
         pigServer = new PigServer(cluster.getExecType(), 
cluster.getProperties());
-        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using 
"+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
-        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' using 
"+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using " + 
DummyCollectableLoader.class.getName() + "() as (id, name, grade);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' using " + 
DummyCollectableLoader.class.getName() + "() as (id, name, grade);");
         try {
             DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
             DataBag dbshj = BagFactory.getInstance().newDefaultBag();
             {
-               pigServer.registerQuery("C = join A by id, B by id using 
'merge';");
+                pigServer.registerQuery("C = join A by id, B by id using 
'merge';");
                 pigServer.registerQuery("D = group C by A::id using 
'collected';");
                 pigServer.registerQuery("E = foreach D generate group, 
COUNT(C);");
                 Iterator<Tuple> iter = pigServer.openIterator("E");
@@ -312,7 +311,7 @@ public class TestCollectedGroup {
                 }
             }
             {
-               pigServer.registerQuery("F = join A by id, B by id;");
+                pigServer.registerQuery("F = join A by id, B by id;");
                 pigServer.registerQuery("G = group F by A::id;");
                 pigServer.registerQuery("H = foreach G generate group, 
COUNT(F);");
                 Iterator<Tuple> iter = pigServer.openIterator("H");
@@ -321,7 +320,7 @@ public class TestCollectedGroup {
                     dbshj.add(iter.next());
                 }
             }
-            Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+            Assert.assertTrue(dbfrj.size() > 0 && dbshj.size() > 0);
             Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
 
         } catch (Exception e) {
@@ -337,4 +336,4 @@ public class TestCollectedGroup {
         }
 
     }
-}
+}
\ No newline at end of file

Modified: pig/trunk/test/org/apache/pig/test/TestCombiner.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCombiner.java Mon May 29 15:00:39 
2017
@@ -116,13 +116,17 @@ public class TestCombiner {
         inputLines.add("a,c,1");
         String inputFileName = loadWithTestLoadFunc("A", pig, inputLines);
 
-        pig.registerQuery("B = group A by ($0, $1);");
-        pig.registerQuery("C = foreach B generate flatten(group), COUNT($1);");
-        Iterator<Tuple> resultIterator = pig.openIterator("C");
-        Tuple tuple = resultIterator.next();
-        assertEquals("(a,b,2)", tuple.toString());
-        tuple = resultIterator.next();
-        assertEquals("(a,c,1)", tuple.toString());
+        pig.registerQuery("B = foreach A generate $0 as (c0:chararray), $1 as 
(c1:chararray), $2 as (c2:int);");
+        pig.registerQuery("C = group B by ($0, $1);");
+        pig.registerQuery("D = foreach C generate flatten(group), COUNT($1) as 
int;");
+        // Since the input has no schema, using 
Util.getTuplesFromConstantTupleStrings fails assert.
+        List<Tuple> resultTuples = Util.getTuplesFromConstantTupleStrings(
+            new String[]{
+                "('a','b',2)",
+                "('a','c',1)",
+            });
+        Iterator<Tuple> resultIterator = pig.openIterator("D");
+        Util.checkQueryOutputsAfterSort(resultIterator, resultTuples);
 
         return inputFileName;
     }
@@ -185,7 +189,7 @@ public class TestCombiner {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);
         pigServer.explain("c", ps);
-        assertTrue(baos.toString().matches("(?si).*combine plan.*"));
+        checkCombinerUsed(pigServer, "c", true);
 
         Iterator<Tuple> it = pigServer.openIterator("c");
         Tuple t = it.next();
@@ -235,7 +239,7 @@ public class TestCombiner {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);
         pigServer.explain("c", ps);
-        assertTrue(baos.toString().matches("(?si).*combine plan.*"));
+        checkCombinerUsed(pigServer, "c", true);
 
         HashMap<String, Object[]> results = new HashMap<String, Object[]>();
         results.put("pig1", new Object[] { "pig1", 3L, 57L, 5.2, 75L, 9.4, 3L, 
3L, 57L });
@@ -256,6 +260,56 @@ public class TestCombiner {
     }
 
     @Test
+    public void testGroupAndUnion() throws Exception {
+        // test use of combiner when group elements are accessed in the foreach
+        String input1[] = {
+                "ABC\t1\ta\t1",
+                "ABC\t1\tb\t2",
+                "ABC\t1\ta\t3",
+                "ABC\t2\tb\t4",
+        };
+
+        Util.createInputFile(cluster, "testGroupElements1.txt", input1);
+        PigServer pigServer = new PigServer(cluster.getExecType(), properties);
+        pigServer.debugOn();
+        pigServer.registerQuery("a1 = load 'testGroupElements1.txt' " +
+                "as (str:chararray, num1:int, alph : chararray, num2 : int);");
+        pigServer.registerQuery("b1 = group a1 by str;");
+
+        // check if combiner is present or not for various forms of foreach
+        pigServer.registerQuery("c1 = foreach b1  generate flatten(group), 
COUNT(a1.alph), SUM(a1.num2); ");
+
+        String input2[] = {
+                "DEF\t2\ta\t3",
+                "DEF\t2\td\t5",
+        };
+
+        Util.createInputFile(cluster, "testGroupElements2.txt", input2);
+        pigServer.registerQuery("a2 = load 'testGroupElements2.txt' " +
+                "as (str:chararray, num1:int, alph : chararray, num2 : int);");
+        pigServer.registerQuery("b2 = group a2 by str;");
+
+        // check if combiner is present or not for various forms of foreach
+        pigServer.registerQuery("c2 = foreach b2  generate flatten(group), 
COUNT(a2.alph), SUM(a2.num2); ");
+
+        pigServer.registerQuery("c = union c1, c2;");
+
+        List<Tuple> expectedRes =
+                Util.getTuplesFromConstantTupleStrings(
+                        new String[]{
+                                "('ABC',4L,10L)",
+                                "('DEF',2L,8L)",
+                        });
+
+        Iterator<Tuple> it = pigServer.openIterator("c");
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+
+        Util.deleteFile(cluster, "testGroupElements1.txt");
+        Util.deleteFile(cluster, "testGroupElements2.txt");
+        pigServer.shutdown();
+    }
+
+    @Test
     public void testGroupElements() throws Exception {
         // test use of combiner when group elements are accessed in the foreach
         String input[] = {
@@ -352,13 +406,19 @@ public class TestCombiner {
         pigServer.shutdown();
     }
 
-    private void checkCombinerUsed(PigServer pigServer, String string, boolean 
combineExpected)
+    private void checkCombinerUsed(PigServer pigServer, String alias, boolean 
combineExpected)
             throws IOException {
         // make sure there is a combine plan in the explain output
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);
-        pigServer.explain("c", ps);
-        boolean combinerFound = baos.toString().matches("(?si).*combine 
plan.*");
+        pigServer.explain(alias, ps);
+        boolean combinerFound;
+        if 
(pigServer.getPigContext().getExecType().name().equalsIgnoreCase("spark")) {
+            combinerFound = baos.toString().contains("Reduce By");
+        } else {
+            combinerFound = baos.toString().matches("(?si).*combine plan.*");
+        }
+
         System.out.println(baos.toString());
         assertEquals("is combiner present as expected", combineExpected, 
combinerFound);
     }

Modified: pig/trunk/test/org/apache/pig/test/TestCubeOperator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCubeOperator.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCubeOperator.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCubeOperator.java Mon May 29 
15:00:39 2017
@@ -566,8 +566,9 @@ public class TestCubeOperator {
     public void testIllustrate() throws Exception {
        // test for illustrate
         Assume.assumeTrue("illustrate does not work in tez (PIG-3993)", 
!Util.getLocalTestMode().toString().startsWith("TEZ"));
-       String query = "a = load 'input' USING mock.Storage() as 
(a1:chararray,b1:chararray,c1:long); "
-               + "b = cube a by cube(a1,b1);";
+        Assume.assumeTrue("illustrate does not work in spark (PIG-4621)", 
!Util.getLocalTestMode().toString().startsWith("SPARK"));
+        String query = "a = load 'input' USING mock.Storage() as 
(a1:chararray,b1:chararray,c1:long); "
+            + "b = cube a by cube(a1,b1);";
 
         Util.registerMultiLineQuery(pigServer, query);
         Map<Operator, DataBag> examples = pigServer.getExamples("b");

Modified: pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java Mon May 29 
15:00:39 2017
@@ -33,6 +33,7 @@ import org.apache.pig.PigRunner;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -85,6 +86,8 @@ public class TestEmptyInputDir {
                 assertEquals(0, js.getNumberMaps());
             }
 
+            //Spark doesn't create an empty result file part-*, only a 
_SUCCESS file if input dir was empty
+            Assume.assumeTrue("Skip this test for Spark. See PIG-5140", 
!Util.isSparkExecType(cluster.getExecType()));
             assertEmptyOutputFile();
         } finally {
             new File(PIG_FILE).delete();

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Mon May 29 
15:00:39 2017
@@ -24,6 +24,7 @@ import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -422,9 +423,15 @@ public class TestEvalPipeline {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-        int numIdentity = 0;
+        List<Tuple> actualResList = new ArrayList<Tuple>();
         while(iter.hasNext()){
-            Tuple t = iter.next();
+            actualResList.add(iter.next());
+        }
+
+        Util.sortQueryOutputsIfNeed(actualResList, 
Util.isSparkExecType(cluster.getExecType()));
+
+        int numIdentity = 0;
+        for (Tuple t : actualResList) {
             Assert.assertEquals((Integer)numIdentity, (Integer)t.get(0));
             Assert.assertEquals((Long)5L, (Long)t.get(2));
             Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
@@ -467,9 +474,15 @@ public class TestEvalPipeline {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-        int numIdentity = 0;
+        List<Tuple> actualResList = new ArrayList<Tuple>();
         while(iter.hasNext()){
-            Tuple t = iter.next();
+            actualResList.add(iter.next());
+        }
+
+        Util.sortQueryOutputsIfNeed(actualResList, 
Util.isSparkExecType(cluster.getExecType()));
+
+        int numIdentity = 0;
+        for (Tuple t : actualResList) {
             Assert.assertEquals((Integer)numIdentity, (Integer)t.get(0));
             Assert.assertEquals((Long)5L, (Long)t.get(2));
             Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
@@ -842,9 +855,15 @@ public class TestEvalPipeline {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-        int numIdentity = 0;
+        List<Tuple> actualResList = new ArrayList<Tuple>();
         while(iter.hasNext()){
-            Tuple t = iter.next();
+            actualResList.add(iter.next());
+        }
+
+        Util.sortQueryOutputsIfNeed(actualResList, 
Util.isSparkExecType(cluster.getExecType()));
+
+        int numIdentity = 0;
+        for (Tuple t : actualResList) {
             Assert.assertEquals((Integer)((numIdentity + 1) * 10), 
(Integer)t.get(0));
             Assert.assertEquals((Long)10L, (Long)t.get(1));
             Assert.assertEquals((Long)5L, (Long)t.get(2));
@@ -873,6 +892,10 @@ public class TestEvalPipeline {
         pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = distinct A;");
+        if (Util.isSparkExecType(cluster.getExecType())) {
+            pigServer.registerQuery("B = order B by *;");
+        }
+
         String query = "C = foreach B {"
         + "C1 = $1 - $0;"
         + "C2 = $1%2;"
@@ -883,7 +906,6 @@ public class TestEvalPipeline {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-
         int numRows = 0;
         for(int i = 0; i < LOOP_COUNT; i++) {
             for(int j = 0; j < LOOP_COUNT; j+=2){
@@ -920,6 +942,10 @@ public class TestEvalPipeline {
         pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = distinct A;");
+        if (Util.isSparkExecType(cluster.getExecType())) {
+            pigServer.registerQuery("B = order B by *;");
+        }
+
         String query = "C = foreach B {"
         + "C1 = $0 + $1;"
         + "C2 = C1 + $0;"
@@ -929,7 +955,6 @@ public class TestEvalPipeline {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-
         int numRows = 0;
         for(int i = 0; i < LOOP_COUNT; i++) {
             for(int j = 0; j < LOOP_COUNT; j+=2){

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon May 29 
15:00:39 2017
@@ -138,9 +138,15 @@ public class TestEvalPipeline2 {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-        int numIdentity = 0;
+        List<Tuple> actualResList = new ArrayList<Tuple>();
         while(iter.hasNext()){
-            Tuple tuple = iter.next();
+            actualResList.add(iter.next());
+        }
+
+        
Util.sortQueryOutputsIfNeed(actualResList,Util.isSparkExecType(cluster.getExecType()));
+
+        int numIdentity = 0;
+        for (Tuple tuple : actualResList) {
             Tuple t = (Tuple)tuple.get(0);
             Assert.assertEquals(DataByteArray.class, t.get(0).getClass());
             int group = Integer.parseInt(new 
String(((DataByteArray)t.get(0)).get()));
@@ -476,17 +482,24 @@ public class TestEvalPipeline2 {
         pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;");
         Iterator<Tuple> iter = pigServer.openIterator("D");
 
-        Assert.assertTrue(iter.hasNext());
-        Tuple t = iter.next();
+        if (Util.isSparkExecType(cluster.getExecType())) {
+            String[] expectedResults =
+                new String[] {"(2,{(2,2)},{(2,5,2)})", "(1,{(1,1)},{(1,2,3)})" 
};
+            Util.checkQueryOutputs(iter, expectedResults,
+                
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")), 
Util.isSparkExecType(cluster.getExecType()));
+        } else {
+            Assert.assertTrue(iter.hasNext());
+            Tuple t = iter.next();
 
-        Assert.assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})"));
+            Assert.assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})"));
 
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
+            Assert.assertTrue(iter.hasNext());
+            t = iter.next();
 
-        Assert.assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})"));
+            Assert.assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})"));
 
-        Assert.assertFalse(iter.hasNext());
+            Assert.assertFalse(iter.hasNext());
+        }
     }
 
     // See PIG-1195
@@ -739,16 +752,10 @@ public class TestEvalPipeline2 {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("EventsPerMinute");
 
-        Tuple t = iter.next();
-        Assert.assertTrue( (Long)t.get(0) == 60000 && (Long)t.get(1) == 2 && 
(Long)t.get(2) == 3 );
-
-        t = iter.next();
-        Assert.assertTrue( (Long)t.get(0) == 120000 && (Long)t.get(1) == 2 && 
(Long)t.get(2) == 2 );
-
-        t = iter.next();
-        Assert.assertTrue( (Long)t.get(0) == 240000 && (Long)t.get(1) == 1 && 
(Long)t.get(2) == 1 );
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+            new String[]{"(60000L,2L,3L)", "(120000L,2L,2L)", 
"(240000L,1L,1L)"});
 
-        Assert.assertFalse(iter.hasNext());
+        Util.checkQueryOutputs(iter, expectedResults, 
Util.isSparkExecType(cluster.getExecType()));
     }
 
     // See PIG-1729
@@ -1580,6 +1587,9 @@ public class TestEvalPipeline2 {
 
         pigServer.registerQuery("data = load 'table_testLimitFlatten' as 
(k,v);");
         pigServer.registerQuery("grouped = GROUP data BY k;");
+        if (Util.isSparkExecType(cluster.getExecType())) {
+            pigServer.registerQuery("grouped = ORDER grouped BY group;");
+        }
         pigServer.registerQuery("selected = LIMIT grouped 2;");
         pigServer.registerQuery("flattened = FOREACH selected GENERATE FLATTEN 
(data);");
 
@@ -1587,7 +1597,9 @@ public class TestEvalPipeline2 {
 
         String[] expected = new String[] {"(1,A)", "(1,B)", "(2,C)"};
 
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, 
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened")));
+        Util.checkQueryOutputs(iter, expected,
+            
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened")),
 
+            Util.isSparkExecType(cluster.getExecType()));
     }
 
     // See PIG-2237
@@ -1650,8 +1662,14 @@ public class TestEvalPipeline2 {
                     return false;
                 }
             });
-            // auto-parallelism is 2 in MR, 20 in Tez, so check >=2
-            Assert.assertTrue(partFiles.length >= 2);
+
+            if (Util.isSparkExecType(cluster.getExecType())) {
+                // TODO: Fix this when we implement auto-parallelism in Spark
+                Assert.assertTrue(partFiles.length == 1);
+            } else {
+                // auto-parallelism is 2 in MR, 20 in Tez, so check >=2
+                Assert.assertTrue(partFiles.length >= 2);
+            }
             // Check the output
             Iterator<Tuple> iter = job.getResults();
             List<Tuple> results = new ArrayList<Tuple>();

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java Mon May 29 
15:00:39 2017
@@ -37,6 +37,7 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.Collections;
 
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Appender;
@@ -884,7 +885,7 @@ public class TestEvalPipelineLocal {
         int LOOP_COUNT = 10;
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
-        for(int i = 0; i < LOOP_COUNT; i++) {
+        for(int i=0; i<LOOP_COUNT; i++) {
             for(int j=0;j<LOOP_COUNT;j+=2){
                 ps.println(i+"\t"+j);
                 ps.println(i+"\t"+j);
@@ -908,18 +909,29 @@ public class TestEvalPipelineLocal {
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
         int numIdentity = 0;
+        // When running with spark, output can be in a different order than 
that 
+        // when running in mr mode.
+        List<Tuple> resList = new ArrayList<Tuple>();
         while(iter.hasNext()){
-            Tuple t = iter.next();
-            Assert.assertEquals((Integer)((numIdentity + 1) * 10), 
(Integer)t.get(0));
+            resList.add(iter.next());
+        }
+
+        numIdentity = resList.size();
+        Util.sortQueryOutputsIfNeed(resList, 
Util.isSparkExecType(Util.getLocalTestMode()));
+        Assert.assertEquals(LOOP_COUNT, numIdentity);
+        // Since delta differences in some cases are allowed, utility function 
+        // to compare tuple-lists cannot be used here.
+        // This loop generates sorted expected data
+        for (int i=0; i<numIdentity; i++) {
+            Tuple t = resList.get(i);
+            Assert.assertEquals((Integer)((i + 1) * 10), (Integer)t.get(0));
             Assert.assertEquals((Long)10L, (Long)t.get(1));
             Assert.assertEquals((Long)5L, (Long)t.get(2));
             Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
             Assert.assertEquals(8.0, (Double)t.get(5), 0.01);
             Assert.assertEquals(5L, ((DataBag)t.get(6)).size());
             Assert.assertEquals(7, t.size());
-            ++numIdentity;
         }
-        Assert.assertEquals(LOOP_COUNT, numIdentity);
     }
 
     @Test
@@ -927,12 +939,25 @@ public class TestEvalPipelineLocal {
         int LOOP_COUNT = 10;
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+
+        List<Tuple> expectedList = new ArrayList<Tuple>();
         for(int i = 0; i < LOOP_COUNT; i++) {
             for(int j=0;j<LOOP_COUNT;j+=2){
                 ps.println(i+"\t"+j);
                 ps.println(i+"\t"+j);
+                //  Generating expected data
+                Tuple t = mTf.newTuple();
+                t.append(new Double(j - i));
+                t.append((Integer)(j%2));
+                if(j == 0) {
+                    t.append(0.0);
+                } else {
+                    t.append((Double)((double)i/j));
+                }
+                expectedList.add(t);
             }
         }
+        Util.sortQueryOutputsIfNeed(expectedList, 
Util.isSparkExecType(Util.getLocalTestMode()));
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
@@ -949,25 +974,30 @@ public class TestEvalPipelineLocal {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
+        // When ruuning with spark, output can be in a different order than 
when
+        // running in mr mode.
+        List<Tuple> resList = new ArrayList<Tuple>();
+        while(iter.hasNext()){
+            resList.add(iter.next());
+        }
 
-        int numRows = 0;
+        Util.sortQueryOutputsIfNeed(resList, 
Util.isSparkExecType(Util.getLocalTestMode()));
+        Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, resList.size());
+
+        // Since delta difference in some cases is allowed, utility function 
+        // to compare tuple-lists cannot be used here.
         for(int i = 0; i < LOOP_COUNT; i++) {
             for(int j = 0; j < LOOP_COUNT; j+=2){
-                Tuple t = null;
-                if(iter.hasNext()) t = iter.next();
-                Assert.assertEquals(3, t.size());
-                Assert.assertEquals(new Double(j - i), (Double)t.get(0), 0.01);
-                Assert.assertEquals((Integer)(j%2), (Integer)t.get(1));
-                if(j == 0) {
-                    Assert.assertEquals(0.0, (Double)t.get(2), 0.01);
-                } else {
-                    Assert.assertEquals((Double)((double)i/j), 
(Double)t.get(2), 0.01);
-                }
-                ++numRows;
+                int k = i*LOOP_COUNT/2 + j/2;
+                Tuple res_t = resList.get(k);
+                Tuple expec_t = expectedList.get(k);
+
+                Assert.assertEquals(expec_t.size(), res_t.size());
+                Assert.assertEquals((Double)expec_t.get(0), 
(Double)res_t.get(0), 0.01);
+                Assert.assertEquals((Integer)expec_t.get(1), 
(Integer)res_t.get(1));
+                Assert.assertEquals((Double)expec_t.get(2), 
(Double)res_t.get(2), 0.01);
             }
         }
-
-        Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
     }
 
     @Test
@@ -975,10 +1005,16 @@ public class TestEvalPipelineLocal {
         int LOOP_COUNT = 10;
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        List<Tuple> expectedList = new ArrayList<Tuple>();
         for(int i = 0; i < LOOP_COUNT; i++) {
             for(int j=0;j<LOOP_COUNT;j+=2){
                 ps.println(i+"\t"+j);
                 ps.println(i+"\t"+j);
+                // Generating expected data.
+                Tuple t = mTf.newTuple();
+                t.append(new Double(i+j));
+                t.append(new Double(i + j + i));
+                expectedList.add(t);
             }
         }
         ps.close();
@@ -996,20 +1032,9 @@ public class TestEvalPipelineLocal {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-
-        int numRows = 0;
-        for(int i = 0; i < LOOP_COUNT; i++) {
-            for(int j = 0; j < LOOP_COUNT; j+=2){
-                Tuple t = null;
-                if(iter.hasNext()) t = iter.next();
-                Assert.assertEquals(2, t.size());
-                Assert.assertEquals(new Double(i + j), (Double)t.get(0), 0.01);
-                Assert.assertEquals(new Double(i + j + i), (Double)t.get(1));
-                ++numRows;
-            }
-        }
-
-        Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
+        // When ruuning with spark, output can be in a different order than 
that
+        // when running in mr mode.
+        Util.checkQueryOutputs(iter, expectedList, 
Util.isSparkExecType(Util.getLocalTestMode()));
     }
 
     @Test
@@ -1128,9 +1153,15 @@ public class TestEvalPipelineLocal {
         pigServer.registerQuery("b = foreach a generate TOTUPLE(x, y) as t, 
z;");
         pigServer.registerQuery("c = group b by t;");
         Iterator<Tuple> iter = pigServer.openIterator("c");
-        
Assert.assertTrue(iter.next().toString().equals("((1,2),{((1,2),3)})"));
-        
Assert.assertTrue(iter.next().toString().equals("((4,5),{((4,5),6)})"));
-        Assert.assertFalse(iter.hasNext());
+        // When ruuning with spark, output can be in a different order than 
that 
+        // when running in mr mode.
+        List<Tuple> expectedRes =
+                Util.getTuplesFromConstantTupleStrings(
+                        new String[] {
+                                "((1,2),{((1,2),3)})",
+                                "((4,5),{((4,5),6)})"
+                        });
+        Util.checkQueryOutputs(iter, expectedRes, 
Util.isSparkExecType(Util.getLocalTestMode()));
     }
     
     @Test
@@ -1297,7 +1328,7 @@ public class TestEvalPipelineLocal {
 
         logger.removeAppender(appender);
 
-        Assert.assertTrue(bos.toString().contains("New For 
Each(false,false)[tuple]"));
+        Assert.assertTrue(bos.toString().contains("New For Each"));
     }
 
     @Test

Modified: pig/trunk/test/org/apache/pig/test/TestFinish.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFinish.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFinish.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFinish.java Mon May 29 15:00:39 2017
@@ -45,7 +45,7 @@ public class TestFinish {
     BagFactory mBf = BagFactory.getInstance();
     File f1;
 
-    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+    private static MiniGenericCluster cluster = 
MiniGenericCluster.buildCluster();
 
     static public class MyEvalFunction extends EvalFunc<Tuple> {
         String execType;

Modified: pig/trunk/test/org/apache/pig/test/TestFlatten.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFlatten.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFlatten.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFlatten.java Mon May 29 15:00:39 2017
@@ -95,6 +95,6 @@ public class TestFlatten {
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
                 new String[] {
                 "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', 
'1', '2')", "('c', 'd', '3', '4')" });
-        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+        Util.checkQueryOutputsAfterSort(actualResults.iterator(), 
expectedResults);
     }
 }

Modified: pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java 
(original)
+++ pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Mon May 
29 15:00:39 2017
@@ -101,14 +101,8 @@ public class TestForEachNestedPlanLocal
         pig.registerQuery("D = foreach C {"
                 + "crossed = cross user, session;"
                 + "generate crossed;" + "}");
-        Iterator<Tuple> expectedItr = expectedResults.iterator();
         Iterator<Tuple> actualItr = pig.openIterator("D");
-        while (expectedItr.hasNext() && actualItr.hasNext()) {
-            Tuple expectedTuple = expectedItr.next();
-            Tuple actualTuple = actualItr.next();
-            assertEquals(expectedTuple, actualTuple);
-        }
-        assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+        Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
     }
 
     @Test
@@ -156,14 +150,8 @@ public class TestForEachNestedPlanLocal
                 + "crossed = cross user, distinct_session;"
                 + "filtered = filter crossed by user::region == 
distinct_session::region;"
                 + "generate filtered;" + "}");
-        Iterator<Tuple> expectedItr = expectedResults.iterator();
         Iterator<Tuple> actualItr = pig.openIterator("D");
-        while (expectedItr.hasNext() && actualItr.hasNext()) {
-            Tuple expectedTuple = expectedItr.next();
-            Tuple actualTuple = actualItr.next();
-            assertEquals(expectedTuple, actualTuple);
-        }
-        assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+        Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
     }
 
     @Test
@@ -187,14 +175,8 @@ public class TestForEachNestedPlanLocal
         pig.registerQuery("D = foreach C {"
                 + "crossed = cross user, session, profile;"
                 + "generate crossed;" + "}");
-        Iterator<Tuple> expectedItr = expectedResults.iterator();
         Iterator<Tuple> actualItr = pig.openIterator("D");
-        while (expectedItr.hasNext() && actualItr.hasNext()) {
-            Tuple expectedTuple = expectedItr.next();
-            Tuple actualTuple = actualItr.next();
-            assertEquals(expectedTuple, actualTuple);
-        }
-        assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+        Util.checkQueryOutputsAfterSort(actualItr, expectedResults);
     }
 
     /*

Modified: pig/trunk/test/org/apache/pig/test/TestGrunt.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestGrunt.java Mon May 29 15:00:39 2017
@@ -70,7 +70,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestGrunt {
-
     static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
     private String basedir = "test/org/apache/pig/test/data";
 
@@ -915,6 +914,15 @@ public class TestGrunt {
 
     @Test
     public void testKeepGoigFailed() throws Throwable {
+        // in mr mode, the output file 'baz' will be automatically deleted if 
the mr job fails
+        // when "cat baz;" is executed, it throws "Encountered IOException. 
Directory baz does not exist"
+        // in GruntParser#processCat() and variable "caught" is true
+        // in spark mode, the output file 'baz' will not be automatically 
deleted even the job fails(see SPARK-7953)
+        // when "cat baz;" is executed, it does not throw exception and the 
variable "caught" is false
+        // TODO: Enable this for Spark when SPARK-7953 is resolved
+        Assume.assumeTrue(
+            "Skip this test for Spark until SPARK-7953 is resolved!",
+            !Util.isSparkExecType(cluster.getExecType()));
         PigServer server = new PigServer(cluster.getExecType(), 
cluster.getProperties());
         PigContext context = server.getPigContext();
         Util.copyFromLocalToCluster(cluster, 
"test/org/apache/pig/test/data/passwd", "passwd");
@@ -936,7 +944,6 @@ public class TestGrunt {
         InputStreamReader reader = new InputStreamReader(cmd);
 
         Grunt grunt = new Grunt(new BufferedReader(reader), context);
-
         boolean caught = false;
         try {
             grunt.exec();
@@ -1004,7 +1011,13 @@ public class TestGrunt {
             grunt.exec();
         } catch (PigException e) {
             caught = true;
-            assertTrue(e.getErrorCode() == 6017);
+            if (!Util.isSparkExecType(cluster.getExecType())) {
+                assertTrue(e.getErrorCode() == 6017);
+            } else {
+                //In spark mode, We wrap ExecException to RunTimeException and 
is thrown out in JobGraphBuilder#sparkOperToRDD,
+                //So unwrap the exception here
+                assertTrue(((ExecException) e.getCause()).getErrorCode() == 
6017);
+            }
         }
 
         if (Util.isMapredExecType(cluster.getExecType())) {
@@ -1621,7 +1634,7 @@ public class TestGrunt {
         boolean found = false;
         for (String line : lines) {
             if (line.matches(".*Added jar .*" + jarName + ".*")) {
-                // MR mode
+                // MR and Spark mode
                 found = true;
             } else if (line.matches(".*Local resource.*" + jarName + ".*")) {
                 // Tez mode

Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Mon May 29 
15:00:39 2017
@@ -18,6 +18,7 @@ package org.apache.pig.test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -45,6 +46,7 @@ import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -1443,23 +1445,20 @@ public class TestHBaseStorage {
 
         Iterator<Tuple> it = pig.openIterator("c");
         int index = 0;
-        while (it.hasNext()) {
-            Tuple t = it.next();
-            String rowKey = (String) t.get(0);
-            int col_a = (Integer) t.get(1);
-            Assert.assertNotNull(t.get(2));
-            double col_b = (Double) t.get(2);
-            String col_c = (String) t.get(3);
-
-            Assert.assertEquals("00".substring((index + "").length()) + index,
-                    rowKey);
-            Assert.assertEquals(index, col_a);
-            Assert.assertEquals(index + 0.0, col_b, 1e-6);
-            Assert.assertEquals("Text_" + index, col_c);
+        List<Tuple> expected = new ArrayList<Tuple>();
+        while (index<TEST_ROW_COUNT) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            t.append("00".substring((index + "").length()) + index);
+            t.append(index);
+            t.append(index + 0.0);
+            t.append("Text_" + index);
+            t.append(index);
+            t.append(new DataByteArray("Text_" + index));
             index++;
+            expected.add(t);
         }
-        Assert.assertEquals(index, TEST_ROW_COUNT);
-    }
+        Util.checkQueryOutputsAfterSort(it, expected);
+}
 
     @Test
     // See PIG-4151

Modified: pig/trunk/test/org/apache/pig/test/TestLimitVariable.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLimitVariable.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLimitVariable.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLimitVariable.java Mon May 29 
15:00:39 2017
@@ -80,15 +80,17 @@ public class TestLimitVariable {
         Iterator<Tuple> it = pigServer.openIterator("g");
 
         List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new 
String[] {
-                "(5.0,36)"});
-        Util.checkQueryOutputs(it, expectedRes);
+                "(5.0,36L)"});
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
         
pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_EXEC_MAP_PARTAGG);
     }
 
     @Test
     public void testLimitVariable2() throws IOException {
+        //add field type here to use  Util.checkQueryOutputsAfterSort 
comparing the expected and actual
+        //results
         String query =
-            "a = load '" + inputFile.getName() + "' as (id, num);" +
+            "a = load '" + inputFile.getName() + "' as (id:int, num:int);" +
             "b = filter a by id == 2;" + // only 1 tuple returned (2,3)
             "c = order a by id ASC;" +
             "d = limit c b.num;" + // test bytearray to long implicit cast
@@ -99,18 +101,20 @@ public class TestLimitVariable {
         Iterator<Tuple> itD = pigServer.openIterator("d");
         List<Tuple> expectedResD = Util.getTuplesFromConstantTupleStrings(new 
String[] {
                 "(1,11)", "(2,3)", "(3,10)" });
-        Util.checkQueryOutputs(itD, expectedResD);
+        Util.checkQueryOutputsAfterSort(itD, expectedResD);
 
         Iterator<Tuple> itE = pigServer.openIterator("e");
         List<Tuple> expectedResE = Util.getTuplesFromConstantTupleStrings(new 
String[] {
                 "(1,11)", "(2,3)", "(3,10)", "(4,11)", "(5,10)", "(6,15)" });
-        Util.checkQueryOutputs(itE, expectedResE);
+        Util.checkQueryOutputsAfterSort(itE, expectedResE);
     }
 
     @Test
     public void testLimitVariable3() throws IOException {
+        //add field type here to use  Util.checkQueryOutputsAfterSort 
comparing the expected and actual
+        //results
         String query =
-            "a = load '" + inputFile.getName() + "' ;" +
+            "a = load '" + inputFile.getName() + "' as (id:int, num:int);" +
             "b = group a all;" +
             "c = foreach b generate COUNT(a) as sum;" +
             "d = order a by $0 ASC;" +
@@ -121,7 +125,7 @@ public class TestLimitVariable {
         Iterator<Tuple> itE = pigServer.openIterator("e");
         List<Tuple> expectedResE = Util.getTuplesFromConstantTupleStrings(new 
String[] {
                 "(1,11)", "(2,3)", "(3,10)", "(4,11)", "(5,10)", "(6,15)" });
-        Util.checkQueryOutputs(itE, expectedResE);
+        Util.checkQueryOutputsAfterSort(itE, expectedResE);
     }
 
     @Test

Modified: pig/trunk/test/org/apache/pig/test/TestLineageFindRelVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLineageFindRelVisitor.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLineageFindRelVisitor.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLineageFindRelVisitor.java Mon May 
29 15:00:39 2017
@@ -259,9 +259,10 @@ public class TestLineageFindRelVisitor {
         pig.registerQuery("E = FOREACH D GENERATE (chararray) tupleD.a1;\n");
         Iterator<Tuple> iter  = pig.openIterator("E");
 
-        Assert.assertEquals("123", iter.next().get(0));
-        Assert.assertEquals("456", iter.next().get(0));
-        Assert.assertEquals("789", iter.next().get(0));
+        Util.checkQueryOutputs(iter,
+                new String[]{"(123)", "(456)", "(789)"},
+                
org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("E")), 
Util.isSparkExecType(Util
+                .getLocalTestMode()));
     }
 
     @Test

Modified: pig/trunk/test/org/apache/pig/test/TestMapSideCogroup.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMapSideCogroup.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMapSideCogroup.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMapSideCogroup.java Mon May 29 
15:00:39 2017
@@ -311,12 +311,11 @@ public class TestMapSideCogroup {
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "," + INPUT_FILE4 
+ "' using "+ DummyCollectableLoader.class.getName() +"() as 
(c1:chararray,c2:int);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' using "+ 
DummyIndexableLoader.class.getName()   +"() as (c1:chararray,c2:int);");
 
-        DataBag dbMergeCogrp = BagFactory.getInstance().newDefaultBag();
+        List<Tuple> dbMergeCogrp = new ArrayList<Tuple>();
 
         pigServer.registerQuery("C = cogroup A by c1, B by c1 using 'merge';");
         Iterator<Tuple> iter = pigServer.openIterator("C");
 
-
         while(iter.hasNext()) {
             Tuple t = iter.next();
             dbMergeCogrp.add(t);
@@ -335,12 +334,29 @@ public class TestMapSideCogroup {
                 "(3,{(3,3),(3,2),(3,1)},{(3,1),(3,2),(3,3)})"
         };
 
-        assertEquals(9, dbMergeCogrp.size());
+        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(results);
+
+        //We need sort dbMergeCogrp because the result is different in 
sequence between spark and other mode when
+        //multiple files are loaded(LOAD INPUT_FILE1,INPUT_FILE4...)
+        for (Tuple t : dbMergeCogrp) {
+            Util.convertBagToSortedBag(t);
+        }
+        for (Tuple t : expected) {
+            Util.convertBagToSortedBag(t);
+        }
+
+        Collections.sort(dbMergeCogrp);
+        Collections.sort(expected);
+        assertEquals(dbMergeCogrp.size(), expected.size());
+
+        //Since TestMapSideCogroup.DummyIndexableLoader.getNext() does not
+        //apply schema for each input 
tuple,Util#checkQueryOutputsAfterSortRecursive fails to assert.
+        // The schema for C is 
(int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,int),(chararray,int),(chararray,int)}).
+        //But the schema for result "dbMergeCogrp" is 
(int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,chararray),(chararray,chararray),(chararray,chararray)})
         Iterator<Tuple> itr = dbMergeCogrp.iterator();
-        for(int i=0; i<9; i++){
-            assertEquals(itr.next().toString(), results[i]);   
+        for (int i = 0; i < dbMergeCogrp.size(); i++) {
+            assertEquals(itr.next().toString(), expected.get(i).toString());
         }
-        assertFalse(itr.hasNext());
     }
 
     @Test

Modified: pig/trunk/test/org/apache/pig/test/TestMergeJoinOuter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMergeJoinOuter.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMergeJoinOuter.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMergeJoinOuter.java Mon May 29 
15:00:39 2017
@@ -168,7 +168,7 @@ public class TestMergeJoinOuter {
     
     @Test
     public void testLeftOuter() throws IOException {
-        
+
         pigServer.registerQuery("A = LOAD '"+INPUT_FILE1+"' using "+ 
DummyCollectableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
         pigServer.registerQuery("B = LOAD '"+INPUT_FILE2+"' using "+ 
DummyIndexableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
 
@@ -197,7 +197,7 @@ public class TestMergeJoinOuter {
     
     @Test
     public void testRightOuter() throws IOException{
-        
+
         pigServer.registerQuery("A = LOAD '"+INPUT_FILE1+"' using "+ 
DummyCollectableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
         pigServer.registerQuery("B = LOAD '"+INPUT_FILE2+"' using "+ 
DummyIndexableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
         pigServer.registerQuery("C = join A by c1 right, B by c1 using 
'merge';");
@@ -224,7 +224,7 @@ public class TestMergeJoinOuter {
     
     @Test
     public void testFullOuter() throws IOException{
-        
+
         pigServer.registerQuery("A = LOAD '"+INPUT_FILE1+"' using "+ 
DummyCollectableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
         pigServer.registerQuery("B = LOAD '"+INPUT_FILE2+"' using "+ 
DummyIndexableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
         pigServer.registerQuery("C = join A by c1 full, B by c1 using 
'merge';");

Modified: pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Mon May 29 15:00:39 
2017
@@ -31,6 +31,9 @@ import org.apache.pig.backend.executione
 import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.tools.pigstats.InputStats;
+
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -106,33 +109,25 @@ public class TestMultiQuery {
         myPig.registerQuery("E = load 'output1' as (a:int, b:int);");
         Iterator<Tuple> iter = myPig.openIterator("E");
 
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {
+        String[] expectedResults = new String[]{
                         "(1,2)",
                         "(2,3)"
-                });
+        };
+        Schema s = myPig.dumpSchema("E");
+        Util.checkQueryOutputs(iter, expectedResults, 
org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+                .isSparkExecType(Util.getLocalTestMode()));
 
-        int counter = 0;
-        while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), 
iter.next().toString());
-        }
-        assertEquals(expectedResults.size(), counter);
 
         myPig.registerQuery("E = load 'output2' as (a:int, b:int);");
         iter = myPig.openIterator("E");
 
-        expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {
+        expectedResults = new String[]{
                         "(2,3)",
                         "(3,4)"
-                });
-
-        counter = 0;
-        while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), 
iter.next().toString());
-        }
-
-        assertEquals(expectedResults.size(), counter);
+        };
+        s = myPig.dumpSchema("E");
+        Util.checkQueryOutputs(iter, expectedResults, 
org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+                .isSparkExecType(Util.getLocalTestMode()));
     }
 
     @Test
@@ -165,20 +160,15 @@ public class TestMultiQuery {
 
         Iterator<Tuple> iter = myPig.openIterator("F");
 
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {
+        String[] expectedResults = new String[]{
                         "(1,2)",
                         "(2,3)",
                         "(3,5)",
                         "(5,6)"
-                });
-
-        int counter = 0;
-        while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), 
iter.next().toString());
-        }
-
-        assertEquals(expectedResults.size(), counter);
+        };
+        Schema s = myPig.dumpSchema("F");
+        Util.checkQueryOutputs(iter, expectedResults, 
org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+                .isSparkExecType(Util.getLocalTestMode()));
     }
 
     @Test
@@ -299,19 +289,14 @@ public class TestMultiQuery {
 
         Iterator<Tuple> iter = myPig.openIterator("E");
 
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {
-                        "(1L,'apple',3,1L,'apple',1L,{(1L)})",
-                        "(2L,'orange',4,2L,'orange',2L,{(2L)})",
-                        "(3L,'persimmon',5,3L,'persimmon',3L,{(3L)})"
-                });
-
-        int counter = 0;
-        while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), 
iter.next().toString());
-        }
-
-        assertEquals(expectedResults.size(), counter);
+        String[] expectedResults = new String[]{
+                "(1L,apple,3,1L,apple,1L,{(1L)})",
+                "(2L,orange,4,2L,orange,2L,{(2L)})",
+                "(3L,persimmon,5,3L,persimmon,3L,{(3L)})"
+        };
+        Schema s = myPig.dumpSchema("E");
+        Util.checkQueryOutputs(iter, expectedResults, 
org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+                .isSparkExecType(Util.getLocalTestMode()));
     }
 
     @Test
@@ -345,19 +330,14 @@ public class TestMultiQuery {
 
         Iterator<Tuple> iter = myPig.openIterator("joined_session_info");
 
-        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                new String[] {
-                        "('apple',{},{('apple','jar',1L)})",
-                        "('orange',{},{('orange','box',1L)})",
-                        "('strawberry',{(30,'strawberry','quit','bot')},{})"
-                });
-
-        int counter = 0;
-        while (iter.hasNext()) {
-            assertEquals(expectedResults.get(counter++).toString(), 
iter.next().toString());
-        }
-
-        assertEquals(expectedResults.size(), counter);
+        String[] expectedResults = new String[]{
+                "(apple,{},{(apple,jar,1L)})",
+                "(orange,{},{(orange,box,1L)})",
+                "(strawberry,{(30,strawberry,quit,bot)},{})"};
+
+        Schema s = myPig.dumpSchema("joined_session_info");
+        Util.checkQueryOutputs(iter, expectedResults, 
org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+                .isSparkExecType(Util.getLocalTestMode()));
     }
 
     @Test
@@ -861,12 +841,12 @@ public class TestMultiQuery {
         List<Tuple> actualResults = data.get("output1");
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
                 new String[] {"(1)", "(2)"});
-        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+        Util.checkQueryOutputsAfterSort(actualResults.iterator(), 
expectedResults);
 
         actualResults = data.get("output2");
         expectedResults = Util.getTuplesFromConstantTupleStrings(
                 new String[] {"(1, 'world')", "(2, 'world')"});
-        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+        Util.checkQueryOutputsAfterSort(actualResults.iterator(), 
expectedResults);
     }
 
     @Test
@@ -908,6 +888,27 @@ public class TestMultiQuery {
     }
 
     @Test
+    public void testMultiQueryJiraPig4899() throws Exception {
+        myPig.setBatchOn();
+
+        myPig.registerQuery("a = load 'passwd' "
+                + "using PigStorage(':') as (uname:chararray, 
passwd:chararray, uid:int, gid:int);");
+        myPig.registerQuery("b1 = foreach a generate uname;");
+        myPig.registerQuery("b2 = foreach a generate uid;");
+        myPig.registerQuery("store b1 into 'output1';");
+        myPig.registerQuery("store b2 into 'output2';");
+
+        List<ExecJob> jobs = myPig.executeBatch();
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            List<InputStats> stats = job.getStatistics().getInputStats();
+            assertEquals(1,stats.size());
+            InputStats stat = stats.get(0);
+            assertEquals("Number of records in passwd file is 
14",14,stat.getNumberRecords());
+        }
+    }
+
+    @Test
     public void testMultiQueryJiraPig4883() throws Exception {
         Storage.Data data = Storage.resetData(myPig);
         data.set("inputLocation",
@@ -934,20 +935,24 @@ public class TestMultiQuery {
 
         List<Tuple> actualResults = data.get("output1");
         String[] expectedResults = new String[]{"(12, 1)"};
-        Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), 
expectedResults, 
org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("B1")));
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults, 
org.apache.pig.newplan
+                .logical.Util.translateSchema(myPig.dumpSchema("B1")), 
Util.isSparkExecType(Util.getLocalTestMode()));
 
 
         actualResults = data.get("output2");
         expectedResults = new String[]{"(c,1)"};
-        Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), 
expectedResults, 
org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("B2")));
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults, 
org.apache.pig.newplan.logical.Util
+                .translateSchema(myPig.dumpSchema("B2")), 
Util.isSparkExecType(Util.getLocalTestMode()));
 
         actualResults = data.get("output3");
         expectedResults = new String[]{"(-12, 1)"};
-        Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), 
expectedResults, 
org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("C1")));
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults, 
org.apache.pig.newplan.logical.Util
+                .translateSchema(myPig.dumpSchema("C1")), 
Util.isSparkExecType(Util.getLocalTestMode()));
 
         actualResults = data.get("output4");
         expectedResults = new String[]{"(d,1)"};
-        Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), 
expectedResults, 
org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("C2")));
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults, 
org.apache.pig.newplan.logical.Util
+                .translateSchema(myPig.dumpSchema("C2")), 
Util.isSparkExecType(Util.getLocalTestMode()));
     }
 
     // 
--------------------------------------------------------------------------

Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Mon May 29 
15:00:39 2017
@@ -487,6 +487,7 @@ public class TestMultiQueryLocal {
     public void testMultiQueryWithIllustrate() throws Exception {
 
         Assume.assumeTrue("illustrate does not work in tez (PIG-3993)", 
!Util.getLocalTestMode().toString().startsWith("TEZ"));
+        Assume.assumeTrue("illustrate does not work in spark (PIG-4621)", 
!Util.getLocalTestMode().toString().startsWith("SPARK"));
         System.out.println("===== test multi-query with illustrate =====");
 
         try {

Modified: pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java Mon May 29 
15:00:39 2017
@@ -29,6 +29,7 @@ import java.util.List;
 
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
@@ -210,6 +211,13 @@ public class TestNativeMapReduce  {
         } catch (JobCreationException e) {
             // Running in Tez mode throw exception
             assertTrue(e.getCause() instanceof FileAlreadyExistsException);
+        } catch (ExecException e) {
+            // Running in spark mode throw exception
+            if (e.getCause() instanceof RuntimeException) {
+                RuntimeException re = (RuntimeException) e.getCause();
+                JobCreationException jce = (JobCreationException) 
re.getCause();
+                assertTrue(jce.getCause() instanceof 
FileAlreadyExistsException);
+            }
         }
         finally{
             // We have to manually delete intermediate mapreduce files

Modified: pig/trunk/test/org/apache/pig/test/TestNullConstant.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNullConstant.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNullConstant.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNullConstant.java Mon May 29 
15:00:39 2017
@@ -109,16 +109,7 @@ public class TestNullConstant {
         pigServer.registerQuery("d = foreach c generate flatten((SIZE(a) == 0 
? null: a)), flatten((SIZE(b) == 0 ? null : b));");
         Iterator<Tuple> it = pigServer.openIterator("d");
         Object[][] results = new Object[][]{{10, "will_join", 10, 
"will_join"}, {11, "will_not_join", null}, {null, 12, "will_not_join"}};
-        int i = 0;
-        while(it.hasNext()) {
-
-            Tuple t = it.next();
-            Object[] result = results[i++];
-            assertEquals(result.length, t.size());
-            for (int j = 0; j < result.length; j++) {
-                assertEquals(result[j], t.get(j));
-            }
-        }
+        Util.checkQueryOutputsAfterSort(it,results);
     }
 
     @Test


Reply via email to