Author: szita Date: Wed Dec 13 10:29:18 2017 New Revision: 1817995 URL: http://svn.apache.org/viewvc?rev=1817995&view=rev Log: PIG-5318: Unit test failures on Pig on Spark with Spark 2.2 (nkollar via szita)
Modified: pig/trunk/CHANGES.txt pig/trunk/test/org/apache/pig/test/TestAssert.java pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java pig/trunk/test/org/apache/pig/test/TestGrunt.java pig/trunk/test/org/apache/pig/test/TestScalarAliases.java pig/trunk/test/org/apache/pig/test/TestStoreBase.java pig/trunk/test/org/apache/pig/test/Util.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1817995&r1=1817994&r2=1817995&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Wed Dec 13 10:29:18 2017 @@ -60,6 +60,8 @@ OPTIMIZATIONS BUG FIXES +PIG-5318: Unit test failures on Pig on Spark with Spark 2.2 (nkollar via szita) + PIG-5201: Null handling on FLATTEN (knoguchi) PIG-5315: pig.script is not set for scripts run via PigServer (satishsaley via rohini) Modified: pig/trunk/test/org/apache/pig/test/TestAssert.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAssert.java?rev=1817995&r1=1817994&r2=1817995&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestAssert.java (original) +++ pig/trunk/test/org/apache/pig/test/TestAssert.java Wed Dec 13 10:29:18 2017 @@ -25,6 +25,7 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.List; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.junit.Assert; import org.apache.pig.PigServer; @@ -118,7 +119,7 @@ public class TestAssert { } catch (FrontendException fe) { if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ") || pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) { - Assert.assertTrue(fe.getCause().getMessage().contains( + Assert.assertTrue(ExceptionUtils.getRootCause(fe).getMessage().contains( "Assertion violated: i should be greater than 1")); } else { Assert.assertTrue(fe.getCause().getMessage().contains( @@ -150,7 +151,7 @@ public class TestAssert { } catch (FrontendException fe) { if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ") || pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) { - Assert.assertTrue(fe.getCause().getMessage().contains( + Assert.assertTrue(ExceptionUtils.getRootCause(fe).getMessage().contains( "Assertion violated: i should be greater than 1")); } else { Assert.assertTrue(fe.getCause().getMessage().contains( Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=1817995&r1=1817994&r2=1817995&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original) +++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Wed Dec 13 10:29:18 2017 @@ -24,7 +24,6 @@ import java.io.PrintStream; import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1817995&r1=1817994&r2=1817995&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original) +++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Wed Dec 13 10:29:18 2017 @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Properties; import java.util.Random; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -1459,7 +1460,8 @@ public class TestEvalPipeline2 { pigServer.openIterator("b"); Assert.fail(); } catch (Exception e) { - Assert.assertTrue(e.getMessage().contains(ArrayList.class.getName())); + Assert.assertTrue(ExceptionUtils.getRootCause(e).getMessage().contains( + "Unexpected data type " + ArrayList.class.getName() + " found in stream.")); } } finally { Modified: pig/trunk/test/org/apache/pig/test/TestGrunt.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=1817995&r1=1817994&r2=1817995&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestGrunt.java (original) +++ pig/trunk/test/org/apache/pig/test/TestGrunt.java Wed Dec 13 10:29:18 2017 @@ -915,16 +915,7 @@ public class TestGrunt { } @Test - public void testKeepGoigFailed() throws Throwable { - // in mr mode, the output file 'baz' will be automatically deleted if the mr job fails - // when "cat baz;" is executed, it throws "Encountered IOException. Directory baz does not exist" - // in GruntParser#processCat() and variable "caught" is true - // in spark mode, the output file 'baz' will not be automatically deleted even the job fails(see SPARK-7953) - // when "cat baz;" is executed, it does not throw exception and the variable "caught" is false - // TODO: Enable this for Spark when SPARK-7953 is resolved - Assume.assumeTrue( - "Skip this test for Spark until SPARK-7953 is resolved!", - !Util.isSparkExecType(cluster.getExecType())); + public void testKeepGoingFailed() throws Throwable { PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); PigContext context = server.getPigContext(); Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", "passwd"); Modified: pig/trunk/test/org/apache/pig/test/TestScalarAliases.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java?rev=1817995&r1=1817994&r2=1817995&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestScalarAliases.java (original) +++ pig/trunk/test/org/apache/pig/test/TestScalarAliases.java Wed Dec 13 10:29:18 2017 @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; import java.util.Iterator; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.pig.PigServer; import org.apache.pig.data.Tuple; import org.junit.AfterClass; @@ -108,7 +109,7 @@ public class TestScalarAliases { pigServer.openIterator("C"); fail("exception expected - scalar input has multiple rows"); } catch (IOException pe){ - Util.checkStrContainsSubStr(pe.getCause().getMessage(), + Util.checkStrContainsSubStr(ExceptionUtils.getRootCause(pe).getMessage(), "Scalar has more than one row in the output" ); } Modified: pig/trunk/test/org/apache/pig/test/TestStoreBase.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStoreBase.java?rev=1817995&r1=1817994&r2=1817995&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestStoreBase.java (original) +++ pig/trunk/test/org/apache/pig/test/TestStoreBase.java Wed Dec 13 10:29:18 2017 @@ -143,6 +143,8 @@ public abstract class TestStoreBase { String outputFileName1 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; + boolean isSpark2_2_plus = Util.isSpark2_2_plus(); + Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>(); if (mode.toString().startsWith("SPARK")) { filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE); @@ -174,13 +176,21 @@ public abstract class TestStoreBase { filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.FALSE); filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE); filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE); - // OutputCommitter.abortTask will not be invoked in spark mode. Detail see SPARK-7953 - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE); + if (isSpark2_2_plus) { + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE); + } else { + // OutputCommitter.abortTask will not be invoked in spark mode before spark 2.2.x. Detail see SPARK-7953 + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE); + } filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE); filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE); filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE); - // OutputCommitter.abortJob will not be invoked in spark mode. Detail see SPARK-7953 - filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE); + if (isSpark2_2_plus) { + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE); + } else { + // OutputCommitter.abortJob will not be invoked in spark mode before spark 2.2.x. Detail see SPARK-7953 + filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE); + } filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE); filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE); filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE); @@ -218,8 +228,12 @@ public abstract class TestStoreBase { if(mode.isLocal()) { // MR LocalJobRunner does not call abortTask - if (!mode.toString().startsWith("TEZ")) { - filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE); + if (!Util.isTezExecType(mode)) { + if (Util.isSparkExecType(mode) && isSpark2_2_plus) { + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE); + } else { + filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE); + } filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE); } if (Util.isHadoop1_x()) { Modified: pig/trunk/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1817995&r1=1817994&r2=1817995&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/Util.java (original) +++ pig/trunk/test/org/apache/pig/test/Util.java Wed Dec 13 10:29:18 2017 @@ -105,6 +105,7 @@ import org.apache.pig.parser.ParserExcep import org.apache.pig.parser.QueryParserDriver; import org.apache.pig.tools.grunt.GruntParser; import org.apache.pig.tools.pigstats.ScriptState; +import org.apache.spark.package$; import org.junit.Assert; import com.google.common.base.Function; @@ -400,7 +401,7 @@ public class Util { } FileStatus fileStatus = fs.getFileStatus(path); FileStatus[] files; - if (fileStatus.isDir()) { + if (fileStatus.isDirectory()) { files = fs.listStatus(path, new PathFilter() { @Override public boolean accept(Path p) { @@ -731,7 +732,7 @@ public class Util { String line = null; FileStatus fst = fs.getFileStatus(new Path(fileNameOnCluster)); - if(fst.isDir()) { + if(fst.isDirectory()) { throw new IOException("Only files from cluster can be copied locally," + " " + fileNameOnCluster + " is a directory"); } @@ -1250,13 +1251,14 @@ public class Util { LogicalSchema resultSchema = org.apache.pig.impl.util.Utils.parseSchema(schemaString); checkQueryOutputsAfterSortRecursive(actualResultsIt, expectedResArray, resultSchema); } - /** + + /** * Helper function to check if the result of a Pig Query is in line with * expected results. It sorts actual and expected string results before comparison * * @param actualResultsIt Result of the executed Pig query * @param expectedResArray Expected string results to validate against - * @param fs fieldSchema of expecteResArray + * @param schema fieldSchema of expecteResArray * @throws IOException */ static public void checkQueryOutputsAfterSortRecursive(Iterator<Tuple> actualResultsIt, @@ -1334,6 +1336,11 @@ public class Util { return false; } + public static boolean isSpark2_2_plus() throws IOException { + String sparkVersion = package$.MODULE$.SPARK_VERSION(); + return sparkVersion != null && sparkVersion.matches("2\\.([\\d&&[^01]]|[\\d]{2,})\\..*"); + } + public static void sortQueryOutputsIfNeed(List<Tuple> actualResList, boolean toSort){ if( toSort == true) { for (Tuple t : actualResList) {