Author: szita
Date: Wed Dec 13 10:29:18 2017
New Revision: 1817995

URL: http://svn.apache.org/viewvc?rev=1817995&view=rev
Log:
PIG-5318: Unit test failures on Pig on Spark with Spark 2.2 (nkollar via szita)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/test/org/apache/pig/test/TestAssert.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
    pig/trunk/test/org/apache/pig/test/TestGrunt.java
    pig/trunk/test/org/apache/pig/test/TestScalarAliases.java
    pig/trunk/test/org/apache/pig/test/TestStoreBase.java
    pig/trunk/test/org/apache/pig/test/Util.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1817995&r1=1817994&r2=1817995&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Dec 13 10:29:18 2017
@@ -60,6 +60,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5318: Unit test failures on Pig on Spark with Spark 2.2 (nkollar via szita)
+
 PIG-5201: Null handling on FLATTEN (knoguchi)
 
 PIG-5315: pig.script is not set for scripts run via PigServer (satishsaley via 
rohini)

Modified: pig/trunk/test/org/apache/pig/test/TestAssert.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAssert.java?rev=1817995&r1=1817994&r2=1817995&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAssert.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAssert.java Wed Dec 13 10:29:18 2017
@@ -25,6 +25,7 @@ import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.util.List;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.junit.Assert;
 
 import org.apache.pig.PigServer;
@@ -118,7 +119,7 @@ public class TestAssert {
       } catch (FrontendException fe) {
           if 
(pigServer.getPigContext().getExecType().toString().startsWith("TEZ")
                   || 
pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) {
-              Assert.assertTrue(fe.getCause().getMessage().contains(
+              
Assert.assertTrue(ExceptionUtils.getRootCause(fe).getMessage().contains(
                       "Assertion violated: i should be greater than 1"));
           } else {
               Assert.assertTrue(fe.getCause().getMessage().contains(
@@ -150,7 +151,7 @@ public class TestAssert {
       } catch (FrontendException fe) {
           if 
(pigServer.getPigContext().getExecType().toString().startsWith("TEZ")
                   || 
pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) {
-              Assert.assertTrue(fe.getCause().getMessage().contains(
+              
Assert.assertTrue(ExceptionUtils.getRootCause(fe).getMessage().contains(
                       "Assertion violated: i should be greater than 1"));
           } else {
               Assert.assertTrue(fe.getCause().getMessage().contains(

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=1817995&r1=1817994&r2=1817995&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Wed Dec 13 
10:29:18 2017
@@ -24,7 +24,6 @@ import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1817995&r1=1817994&r2=1817995&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Wed Dec 13 
10:29:18 2017
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -1459,7 +1460,8 @@ public class TestEvalPipeline2 {
                 pigServer.openIterator("b");
                 Assert.fail();
             } catch (Exception e) {
-                
Assert.assertTrue(e.getMessage().contains(ArrayList.class.getName()));
+                
Assert.assertTrue(ExceptionUtils.getRootCause(e).getMessage().contains(
+                        "Unexpected data type " + ArrayList.class.getName() + 
" found in stream."));
             }
         }
         finally {

Modified: pig/trunk/test/org/apache/pig/test/TestGrunt.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=1817995&r1=1817994&r2=1817995&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestGrunt.java Wed Dec 13 10:29:18 2017
@@ -915,16 +915,7 @@ public class TestGrunt {
     }
 
     @Test
-    public void testKeepGoigFailed() throws Throwable {
-        // in mr mode, the output file 'baz' will be automatically deleted if 
the mr job fails
-        // when "cat baz;" is executed, it throws "Encountered IOException. 
Directory baz does not exist"
-        // in GruntParser#processCat() and variable "caught" is true
-        // in spark mode, the output file 'baz' will not be automatically 
deleted even the job fails(see SPARK-7953)
-        // when "cat baz;" is executed, it does not throw exception and the 
variable "caught" is false
-        // TODO: Enable this for Spark when SPARK-7953 is resolved
-        Assume.assumeTrue(
-            "Skip this test for Spark until SPARK-7953 is resolved!",
-            !Util.isSparkExecType(cluster.getExecType()));
+    public void testKeepGoingFailed() throws Throwable {
         PigServer server = new PigServer(cluster.getExecType(), 
cluster.getProperties());
         PigContext context = server.getPigContext();
         Util.copyFromLocalToCluster(cluster, 
"test/org/apache/pig/test/data/passwd", "passwd");

Modified: pig/trunk/test/org/apache/pig/test/TestScalarAliases.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java?rev=1817995&r1=1817994&r2=1817995&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestScalarAliases.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestScalarAliases.java Wed Dec 13 
10:29:18 2017
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
 import org.junit.AfterClass;
@@ -108,7 +109,7 @@ public class TestScalarAliases  {
             pigServer.openIterator("C");
             fail("exception expected - scalar input has multiple rows");
         } catch (IOException pe){
-            Util.checkStrContainsSubStr(pe.getCause().getMessage(),
+            
Util.checkStrContainsSubStr(ExceptionUtils.getRootCause(pe).getMessage(),
                     "Scalar has more than one row in the output"
             );
         }

Modified: pig/trunk/test/org/apache/pig/test/TestStoreBase.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreBase.java?rev=1817995&r1=1817994&r2=1817995&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStoreBase.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStoreBase.java Wed Dec 13 10:29:18 
2017
@@ -143,6 +143,8 @@ public abstract class TestStoreBase {
         String outputFileName1 = TESTDIR + "/TestStore-output-" + new 
Random().nextLong() + ".txt";
         String outputFileName2 = TESTDIR + "/TestStore-output-" + new 
Random().nextLong() + ".txt";
 
+        boolean isSpark2_2_plus = Util.isSpark2_2_plus();
+        
         Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
         if (mode.toString().startsWith("SPARK")) {
             filesToVerify.put(outputFileName1 + 
"_cleanupOnFailure_succeeded1", Boolean.TRUE);
@@ -174,13 +176,21 @@ public abstract class TestStoreBase {
             filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + 
"2", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + 
"1", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + 
"2", Boolean.FALSE);
-            // OutputCommitter.abortTask will not be invoked in spark mode. 
Detail see SPARK-7953
-            filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"1", Boolean.FALSE);
+            if (isSpark2_2_plus) {
+                filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"1", Boolean.TRUE);
+            } else {
+                // OutputCommitter.abortTask will not be invoked in spark mode 
before spark 2.2.x. Detail see SPARK-7953
+                filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"1", Boolean.FALSE);
+            }
             filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"2", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + 
"1", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + 
"2", Boolean.FALSE);
-            // OutputCommitter.abortJob will not be invoked in spark mode. 
Detail see SPARK-7953
-            filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", 
Boolean.FALSE);
+            if (isSpark2_2_plus) {
+                filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + 
"1", Boolean.TRUE);
+            } else {
+                // OutputCommitter.abortJob will not be invoked in spark mode 
before spark 2.2.x. Detail see SPARK-7953
+                filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + 
"1", Boolean.FALSE);
+            }
             filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", 
Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + 
"1", Boolean.FALSE);
             filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + 
"2", Boolean.FALSE);
@@ -218,8 +228,12 @@ public abstract class TestStoreBase {
 
         if(mode.isLocal()) {
             // MR LocalJobRunner does not call abortTask
-            if (!mode.toString().startsWith("TEZ")) {
-                filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"1", Boolean.FALSE);
+            if (!Util.isTezExecType(mode)) {
+                if (Util.isSparkExecType(mode) && isSpark2_2_plus) {
+                    
filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", 
Boolean.TRUE);
+                } else {
+                    
filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", 
Boolean.FALSE);
+                }
                 filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + 
"2", Boolean.FALSE);
             }
             if (Util.isHadoop1_x()) {

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1817995&r1=1817994&r2=1817995&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Wed Dec 13 10:29:18 2017
@@ -105,6 +105,7 @@ import org.apache.pig.parser.ParserExcep
 import org.apache.pig.parser.QueryParserDriver;
 import org.apache.pig.tools.grunt.GruntParser;
 import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.spark.package$;
 import org.junit.Assert;
 
 import com.google.common.base.Function;
@@ -400,7 +401,7 @@ public class Util {
         }
         FileStatus fileStatus = fs.getFileStatus(path);
         FileStatus[] files;
-        if (fileStatus.isDir()) {
+        if (fileStatus.isDirectory()) {
             files = fs.listStatus(path, new PathFilter() {
                 @Override
                 public boolean accept(Path p) {
@@ -731,7 +732,7 @@ public class Util {
 
         String line = null;
           FileStatus fst = fs.getFileStatus(new Path(fileNameOnCluster));
-          if(fst.isDir()) {
+          if(fst.isDirectory()) {
               throw new IOException("Only files from cluster can be copied 
locally," +
                        " " + fileNameOnCluster + " is a directory");
           }
@@ -1250,13 +1251,14 @@ public class Util {
         LogicalSchema resultSchema = 
org.apache.pig.impl.util.Utils.parseSchema(schemaString);
         checkQueryOutputsAfterSortRecursive(actualResultsIt, expectedResArray, 
resultSchema);
     }
-          /**
+    
+    /**
      * Helper function to check if the result of a Pig Query is in line with
      * expected results. It sorts actual and expected string results before 
comparison
      *
      * @param actualResultsIt Result of the executed Pig query
      * @param expectedResArray Expected string results to validate against
-     * @param fs fieldSchema of expecteResArray
+     * @param schema fieldSchema of expecteResArray
      * @throws IOException
      */
     static public void checkQueryOutputsAfterSortRecursive(Iterator<Tuple> 
actualResultsIt,
@@ -1334,6 +1336,11 @@ public class Util {
         return false;
     }
 
+    public static boolean isSpark2_2_plus() throws IOException {
+        String sparkVersion = package$.MODULE$.SPARK_VERSION();
+        return sparkVersion != null && 
sparkVersion.matches("2\\.([\\d&&[^01]]|[\\d]{2,})\\..*");
+    }
+
     public static void sortQueryOutputsIfNeed(List<Tuple> actualResList, 
boolean toSort){
         if( toSort == true) {
             for (Tuple t : actualResList) {


Reply via email to