Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3-OPTOFF.gld Wed Feb 22 09:43:41 2017 @@ -66,7 +66,7 @@ e: Local Rearrange[tuple]{int}(false) - |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-67 Tez vertex scope-94 # Plan on vertex -e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-86 +e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-86 | |---e: New For Each(true,true)[tuple] - scope-85 | |
Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld Wed Feb 22 09:43:41 2017 @@ -65,7 +65,7 @@ Tez vertex group scope-45 <- [scope-37, # No plan on vertex group Tez vertex scope-44 # Plan on vertex -e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-36 +e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-36 | |---e: New For Each(true,true)[tuple] - scope-35 | | Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4-OPTOFF.gld Wed Feb 22 09:43:41 2017 @@ -58,7 +58,7 @@ Local Rearrange[tuple]{int}(false) - sco |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-58 Tez vertex scope-77 # Plan on vertex -e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-74 +e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-74 | |---e: FRJoin[tuple] - scope-68 <- scope-81 | | Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-4.gld Wed Feb 22 09:43:41 2017 @@ -58,7 +58,7 @@ Local Rearrange[tuple]{int}(false) - sco |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17 Tez vertex scope-36 # Plan on vertex -e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-33 +e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-33 | |---e: FRJoin[tuple] - scope-27 <- scope-40 | | Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5-OPTOFF.gld Wed Feb 22 09:43:41 2017 @@ -48,7 +48,7 @@ Local Rearrange[tuple]{int}(false) - sco |---POShuffledValueInputTez - scope-166 <- [scope-163, scope-164] Tez vertex scope-162 # Plan on vertex -e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-161 +e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-161 | |---e: FRJoin[tuple] - scope-155 <- scope-165 | | Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld Wed Feb 22 09:43:41 2017 @@ -47,7 +47,7 @@ Tez vertex group scope-123 <- [scope-11 # No plan on vertex group Tez vertex scope-116 # Plan on vertex -e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-115 +e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-115 | |---e: FRJoin[tuple] - scope-109 <- scope-123 | | Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld Wed Feb 22 09:43:41 2017 @@ -48,7 +48,7 @@ Local Rearrange[tuple]{tuple}(false) - s | | | Constant(DummyVal) - scope-122 | -|---New For Each(true,true)[tuple] - scope-127 +|---New For Each(false,true)[tuple] - scope-127 | | | Project[int][0] - scope-110 | | @@ -104,7 +104,7 @@ Partition Rearrange[tuple]{int}(false) - |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-102 Tez vertex scope-142 # Plan on vertex -e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-113 +e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-113 | |---New For Each(true,true)[tuple] - scope-146 | | Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld Wed Feb 22 09:43:41 2017 @@ -19,7 +19,7 @@ Local Rearrange[tuple]{tuple}(false) - s | | | Constant(DummyVal) - scope-68 | -|---New For Each(true,true)[tuple] - scope-74 +|---New For Each(false,true)[tuple] - scope-74 | | | Project[int][0] - scope-71 | | @@ -50,7 +50,7 @@ Local Rearrange[tuple]{tuple}(false) - s | | | Constant(DummyVal) - scope-78 | -|---New For Each(true,true)[tuple] - scope-84 +|---New For Each(false,true)[tuple] - scope-84 | | | Project[int][0] - scope-81 | | @@ -120,7 +120,7 @@ Partition Rearrange[tuple]{int}(false) - |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17 Tez vertex scope-57 # Plan on vertex -e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-28 +e: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-28 | |---New For Each(true,true)[tuple] - scope-61 | | Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld Wed Feb 22 09:43:41 2017 @@ -86,7 +86,7 @@ POIdentityInOutTez - scope-118 <- scope | Project[int][0] - scope-90 Tez vertex scope-119 # Plan on vertex -d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-92 +d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-92 | |---New For Each(true)[tuple] - scope-122 | | Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld Wed Feb 22 09:43:41 2017 @@ -102,7 +102,7 @@ POIdentityInOutTez - scope-45 <- scope- | Project[int][0] - scope-17 Tez vertex scope-46 # Plan on vertex -d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-19 +d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-19 | |---New For Each(true)[tuple] - scope-49 | | Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld Wed Feb 22 09:43:41 2017 @@ -64,7 +64,7 @@ POValueOutputTez - scope-73 -> [scope-7 |---POValueInputTez - scope-68 <- scope-65 Tez vertex scope-70 # Plan on vertex -d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-59 +d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-59 | |---d: Limit - scope-58 | Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld Wed Feb 22 09:43:41 2017 @@ -64,7 +64,7 @@ POValueOutputTez - scope-36 -> [scope-3 |---POValueInputTez - scope-31 <- scope-28 Tez vertex scope-33 # Plan on vertex -d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-22 +d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-22 | |---d: Limit - scope-21 | Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld Wed Feb 22 09:43:41 2017 @@ -15,7 +15,7 @@ Tez vertex scope-45 | | | a2: Split - scope-68 | | | -| | a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-17 +| | a2: Store(file:///tmp/pigoutput/a2:org.apache.pig.builtin.PigStorage) - scope-17 | | | | | POValueOutputTez - scope-60 -> [scope-57] | | @@ -74,7 +74,7 @@ Tez vertex scope-57 # Plan on vertex 1-3: Split - scope-67 | | -| d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-38 +| d: Store(file:///tmp/pigoutput/d:org.apache.pig.builtin.PigStorage) - scope-38 | | | |---d: Filter[bag] - scope-34 | | | @@ -84,7 +84,7 @@ Tez vertex scope-57 | | | | | |---Constant(500) - scope-36 | | -| e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-44 +| e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-44 | | | |---e: Filter[bag] - scope-39 | | | Modified: pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld (original) +++ pig/branches/spark/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld Wed Feb 22 09:43:41 2017 @@ -4,7 +4,7 @@ #-------------------------------------------------- # TEZ DAG plan: pig-0_scope-0 #-------------------------------------------------- -Tez vertex scope-45 -> Tez vertex group scope-70,Tez vertex group scope-70,Tez vertex group scope-71,Tez vertex group scope-71, +Tez vertex scope-45 -> Tez vertex group scope-70,Tez vertex group scope-71, Tez vertex scope-56 -> Tez vertex group scope-70,Tez vertex group scope-71, Tez vertex group scope-70 Tez vertex group scope-71 @@ -15,11 +15,11 @@ Tez vertex scope-45 | | | a2: Split - scope-68 | | | -| | a2: Store(file:///tmp/output/a2:org.apache.pig.builtin.PigStorage) - scope-17 +| | a2: Store(file:///tmp/pigoutput/a2:org.apache.pig.builtin.PigStorage) - scope-17 | | | | | 1-3: Split - scope-72 | | | | -| | | d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-77 -> scope-38 +| | | d: Store(file:///tmp/pigoutput/d:org.apache.pig.builtin.PigStorage) - scope-77 -> scope-38 | | | | | | | |---d: Filter[bag] - scope-73 | | | | | @@ -29,7 +29,7 @@ Tez vertex scope-45 | | | | | | | | | |---Constant(500) - scope-75 | | | | -| | | e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-83 -> scope-44 +| | | e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-83 -> scope-44 | | | | | | | |---e: Filter[bag] - scope-78 | | | | | @@ -53,7 +53,7 @@ Tez vertex scope-45 | | | 1-3: Split - scope-84 | | | -| | d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-89 -> scope-38 +| | d: Store(file:///tmp/pigoutput/d:org.apache.pig.builtin.PigStorage) - scope-89 -> scope-38 | | | | | |---d: Filter[bag] - scope-85 | | | | @@ -63,7 +63,7 @@ Tez vertex scope-45 | | | | | | | |---Constant(500) - scope-87 | | | -| | e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-95 -> scope-44 +| | e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-95 -> scope-44 | | | | | |---e: Filter[bag] - scope-90 | | | | @@ -98,7 +98,7 @@ Tez vertex scope-56 # Plan on vertex 1-3: Split - scope-96 | | -| d: Store(file:///tmp/output/d:org.apache.pig.builtin.PigStorage) - scope-101 -> scope-38 +| d: Store(file:///tmp/pigoutput/d:org.apache.pig.builtin.PigStorage) - scope-101 -> scope-38 | | | |---d: Filter[bag] - scope-97 | | | @@ -108,7 +108,7 @@ Tez vertex scope-56 | | | | | |---Constant(500) - scope-99 | | -| e: Store(file:///tmp/output/e:org.apache.pig.builtin.PigStorage) - scope-107 -> scope-44 +| e: Store(file:///tmp/pigoutput/e:org.apache.pig.builtin.PigStorage) - scope-107 -> scope-44 | | | |---e: Filter[bag] - scope-102 | | | Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezAutoParallelism.java Wed Feb 22 09:43:41 2017 @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; @@ -47,6 +48,7 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.test.MiniGenericCluster; import org.apache.pig.test.Util; +import org.apache.tez.dag.api.TezConfiguration; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -62,12 +64,23 @@ public class TestTezAutoParallelism { private static Properties properties; private static MiniGenericCluster cluster; + private static final PathFilter PART_FILE_FILTER = new PathFilter() { + @Override + public boolean accept(Path path) { + if (path.getName().startsWith("part")) { + return true; + } + return false; + } + }; + @BeforeClass public static void oneTimeSetUp() throws Exception { cluster = MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_TEZ); properties = cluster.getProperties(); //Test spilling to disk as tests here have multiple splits properties.setProperty(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD, "10"); + properties.setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, "false"); createFiles(); } @@ -84,6 +97,11 @@ public class TestTezAutoParallelism { @After public void tearDown() throws Exception { + removeProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION); + removeProperty(MRConfiguration.MAX_SPLIT_SIZE); + removeProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM); + removeProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART); + removeProperty(TezConfiguration.TEZ_AM_LOG_LEVEL); pigServer.shutdown(); pigServer = null; } @@ -131,32 +149,53 @@ public class TestTezAutoParallelism { @Test public void testGroupBy() throws IOException{ // parallelism is 3 originally, reduce to 1 - pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = group A by name;"); pigServer.store("B", "output1"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){ - @Override - public boolean accept(Path path) { - if (path.getName().startsWith("part")) { - return true; - } - return false; - } - }); + FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER); assertEquals(files.length, 1); + fs.delete(new Path("output1"), true); + } + + @Test + public void testBytesPerReducer() throws IOException{ + + NodeIdGenerator.reset(); + PigServer.resetScope(); + + setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000"); + + StringWriter writer = new StringWriter(); + Util.createLogAppender("testAutoParallelism", writer, TezDagBuilder.class); + try { + pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); + pigServer.registerQuery("B = group A by name;"); + pigServer.store("B", "output1"); + FileSystem fs = cluster.getFileSystem(); + FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER); + assertEquals(files.length, 10); + String log = writer.toString(); + assertTrue(log.contains("For vertex - scope-13: parallelism=3")); + assertTrue(log.contains("For vertex - scope-14: parallelism=10")); + } finally { + Util.removeLogAppender("testAutoParallelism", TezDagBuilder.class); + Util.deleteFile(cluster, "output1"); + } } @Test public void testOrderbyDecreaseParallelism() throws IOException{ // order by parallelism is 3 originally, reduce to 1 - pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = group A by name parallel 3;"); @@ -164,86 +203,54 @@ public class TestTezAutoParallelism { pigServer.registerQuery("D = order C by age;"); pigServer.store("D", "output2"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output2"), new PathFilter(){ - @Override - public boolean accept(Path path) { - if (path.getName().startsWith("part")) { - return true; - } - return false; - } - }); + FileStatus[] files = fs.listStatus(new Path("output2"), PART_FILE_FILTER); assertEquals(files.length, 1); } @Test public void testOrderbyIncreaseParallelism() throws IOException{ // order by parallelism is 3 originally, increase to 4 - pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000"); + setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000"); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = group A by name parallel 3;"); pigServer.registerQuery("C = foreach B generate group as name, AVG(A.age) as age;"); pigServer.registerQuery("D = order C by age;"); pigServer.store("D", "output3"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output3"), new PathFilter(){ - @Override - public boolean accept(Path path) { - if (path.getName().startsWith("part")) { - return true; - } - return false; - } - }); + FileStatus[] files = fs.listStatus(new Path("output3"), PART_FILE_FILTER); assertEquals(files.length, 4); } @Test public void testSkewedJoinDecreaseParallelism() throws IOException{ // skewed join parallelism is 4 originally, reduce to 1 - pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); pigServer.registerQuery("C = join A by name, B by name using 'skewed';"); pigServer.store("C", "output4"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output4"), new PathFilter(){ - @Override - public boolean accept(Path path) { - if (path.getName().startsWith("part")) { - return true; - } - return false; - } - }); + FileStatus[] files = fs.listStatus(new Path("output4"), PART_FILE_FILTER); assertEquals(files.length, 1); } @Test public void testSkewedJoinIncreaseParallelism() throws IOException{ // skewed join parallelism is 3 originally, increase to 5 - pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); + setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); pigServer.registerQuery("C = join A by name, B by name using 'skewed';"); pigServer.store("C", "output5"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){ - @Override - public boolean accept(Path path) { - if (path.getName().startsWith("part")) { - return true; - } - return false; - } - }); + FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER); assertEquals(files.length, 5); } @@ -251,23 +258,15 @@ public class TestTezAutoParallelism { public void testSkewedFullJoinIncreaseParallelism() throws IOException{ // skewed full join parallelism take the initial setting, since the join vertex has a broadcast(sample) dependency, // which prevent it changing parallelism - pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); + setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); pigServer.registerQuery("C = join A by name full, B by name using 'skewed';"); pigServer.store("C", "output6"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){ - @Override - public boolean accept(Path path) { - if (path.getName().startsWith("part")) { - return true; - } - return false; - } - }); + FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER); assertEquals(files.length, 5); } @@ -275,9 +274,9 @@ public class TestTezAutoParallelism { public void testSkewedJoinIncreaseParallelismWithScalar() throws IOException{ // skewed join parallelism take the initial setting, since the join vertex has a broadcast(scalar) dependency, // which prevent it changing parallelism - pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); - pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); + setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);"); pigServer.registerQuery("C = join A by name, B by name using 'skewed';"); @@ -287,19 +286,29 @@ public class TestTezAutoParallelism { pigServer.registerQuery("G = foreach C generate age/F.count, gender;"); pigServer.store("G", "output7"); FileSystem fs = cluster.getFileSystem(); - FileStatus[] files = fs.listStatus(new Path("output7"), new PathFilter(){ - @Override - public boolean accept(Path path) { - if (path.getName().startsWith("part")) { - return true; - } - return false; - } - }); + FileStatus[] files = fs.listStatus(new Path("output7"), PART_FILE_FILTER); assertEquals(files.length, 4); } @Test + public void testSkewedJoinRightInputAutoParallelism() throws IOException{ + setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000"); + setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000"); + setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "1.0"); + setProperty(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG"); + pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); + pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"); + pigServer.registerQuery("B = FILTER B by name == 'Noah';"); + pigServer.registerQuery("B1 = group B by name;"); + pigServer.registerQuery("C = join A by name, B1 by group using 'skewed';"); + pigServer.store("C", "output8"); + FileSystem fs = cluster.getFileSystem(); + FileStatus[] files = fs.listStatus(new Path("output8"), PART_FILE_FILTER); + assertEquals(5, files.length); + } + + @Test public void testFlattenParallelism() throws IOException{ String outputDir = "/tmp/testFlattenParallelism"; String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);" @@ -386,9 +395,9 @@ public class TestTezAutoParallelism { // When there is a combiner operation involved user specified parallelism is overriden Util.createLogAppender("testAutoParallelism", writer, classesToLog); try { - pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); - pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000"); - pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000"); + setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true"); + setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000"); + setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000"); pigServer.setBatchOn(); pigServer.registerScript(new ByteArrayInputStream(script.getBytes())); pigServer.executeBatch(); @@ -416,4 +425,12 @@ public class TestTezAutoParallelism { Util.deleteFile(cluster, outputDir); } } + + private void setProperty(String property, String value) { + pigServer.getPigContext().getProperties().setProperty(property, value); + } + + private void removeProperty(String property) { + pigServer.getPigContext().getProperties().remove(property); + } } Modified: pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/tez/TestTezCompiler.java Wed Feb 22 09:43:41 2017 @@ -20,13 +20,21 @@ package org.apache.pig.tez; import static org.junit.Assert.assertEquals; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.IOException; import java.io.PrintStream; import java.util.Properties; +import java.util.Random; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; +import org.apache.pig.StoreFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher; @@ -35,7 +43,9 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter; import org.apache.pig.builtin.OrcStorage; import org.apache.pig.builtin.PigStorage; +import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat; import org.apache.pig.test.Util; @@ -66,11 +76,14 @@ public class TestTezCompiler { @BeforeClass public static void setUpBeforeClass() throws Exception { + resetFileLocalizer(); pc = new PigContext(new TezLocalExecType(), new Properties()); + FileUtils.deleteDirectory(new File("/tmp/pigoutput")); } @AfterClass public static void tearDownAfterClass() throws Exception { + resetFileLocalizer(); } @Before @@ -79,6 +92,7 @@ public class TestTezCompiler { pc.getProperties().remove(PigConfiguration.PIG_OPT_MULTIQUERY); pc.getProperties().remove(PigConfiguration.PIG_TEZ_OPT_UNION); pc.getProperties().remove(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY); + pc.getProperties().remove(PigConfiguration.PIG_BLOOMJOIN_STRATEGY); pigServer = new PigServer(pc); } @@ -88,13 +102,20 @@ public class TestTezCompiler { TezPlanContainer.resetScope(); } + private static void resetFileLocalizer() { + FileLocalizer.deleteTempFiles(); + FileLocalizer.setInitialized(false); + // Set random seed to generate deterministic temporary paths + FileLocalizer.setR(new Random(1331L)); + } + @Test public void testStoreLoad() throws Exception { String query = "a = load 'file:///tmp/input' as (x:int, y:int);" + - "store a into 'file:///tmp/output';" + - "b = load 'file:///tmp/output' as (x:int, y:int);" + - "store b into 'file:///tmp/output1';"; + "store a into 'file:///tmp/pigoutput';" + + "b = load 'file:///tmp/pigoutput' as (x:int, y:int);" + + "store b into 'file:///tmp/pigoutput1';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-1.gld"); } @@ -103,25 +124,85 @@ public class TestTezCompiler { public void testStoreLoadMultiple() throws Exception { String query = "a = load 'file:///tmp/input';" + - "store a into 'file:///tmp/output/Dir1';" + - "a = load 'file:///tmp/output/Dir1';" + - "store a into 'file:///tmp/output/Dir2' using BinStorage();" + - "a = load 'file:///tmp/output/Dir1';" + - "store a into 'file:///tmp/output/Dir3';" + - "a = load 'file:///tmp/output/Dir2' using BinStorage();" + - "store a into 'file:///tmp/output/Dir4';" + - "a = load 'file:///tmp/output/Dir3';" + - "b = load 'file:///tmp/output/Dir2' using BinStorage();" + - "c = load 'file:///tmp/output/Dir1';" + + "store a into 'file:///tmp/pigoutput/Dir1';" + + "a = load 'file:///tmp/pigoutput/Dir1';" + + "store a into 'file:///tmp/pigoutput/Dir2' using BinStorage();" + + "a = load 'file:///tmp/pigoutput/Dir1';" + + "store a into 'file:///tmp/pigoutput/Dir3';" + + "a = load 'file:///tmp/pigoutput/Dir2' using BinStorage();" + + "store a into 'file:///tmp/pigoutput/Dir4';" + + "a = load 'file:///tmp/pigoutput/Dir3';" + + "b = load 'file:///tmp/pigoutput/Dir2' using BinStorage();" + + "c = load 'file:///tmp/pigoutput/Dir1';" + "d = cogroup a by $0, b by $0, c by $0;" + - "store d into 'file:///tmp/output/Dir5';"; + "store d into 'file:///tmp/pigoutput/Dir5';"; - // To get around difference in ordering of operators in plan due to JDK7 and JDK8 - if (System.getProperties().getProperty("java.version").startsWith("1.8")) { - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld"); - } else { - run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2-JDK7.gld"); - } + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-2.gld"); + } + + @Test + public void testStoreLoadJoinMultiple() throws Exception { + // Case where different store load statements are used in a single join + String query = + "a = load 'file:///tmp/pigoutput/Dir1';" + + "b = filter a by $0 == 1;" + + "c = filter a by $0 == 2;" + + "store b into 'file:///tmp/pigoutput/Dir2';" + + "store c into 'file:///tmp/pigoutput/Dir3';" + + "d = load 'file:///tmp/pigoutput/Dir2';" + + "e = load 'file:///tmp/pigoutput/Dir3';" + + "f = join d by $0, e by $0;" + + "store f into 'file:///tmp/pigoutput/Dir5';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-3.gld"); + + resetScope(); + query = + "a = load 'file:///tmp/pigoutput/Dir1';" + + "b = distinct a;" + + "c = group a by $0;" + + "store b into 'file:///tmp/pigoutput/Dir2';" + + "store c into 'file:///tmp/pigoutput/Dir3';" + + "d = load 'file:///tmp/pigoutput/Dir2';" + + "e = load 'file:///tmp/pigoutput/Dir3';" + + "f = load 'file:///tmp/pigoutput/Dir4';" + + "g = join d by $0, f by $0 using 'repl';" + + "h = join e by $0, f by $0 using 'repl';" + + "store g into 'file:///tmp/pigoutput/Dir4';" + + "store h into 'file:///tmp/pigoutput/Dir5';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-4.gld"); + } + + @Test + public void testStoreLoadSplit() throws Exception { + // Cases where segmenting into two DAGs is not straight forward due to Split. + // The Split operator is required in both the segments. + + resetFileLocalizer(); + // Split operator as root vertex + String query = + "a = load 'file:///tmp/input';" + + "a1 = filter a by $0 == 5;" + + "store a1 into 'file:///tmp/pigoutput/Dir1';" + + "b = load 'file:///tmp/pigoutput/Dir1';" + + "c = join a by $0, b by $0;" + + "store c into 'file:///tmp/pigoutput/Dir2';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-5.gld"); + + // Split operator as intermediate vertex + query = + "a = load 'file:///tmp/input';" + + "a = distinct a;" + + "store a into 'file:///tmp/pigoutput/Dir1';" + + "b = load 'file:///tmp/pigoutput/Dir1';" + + "c = join a by $0, b by $0;" + + "store c into 'file:///tmp/pigoutput/Dir2';"; + + resetScope(); + resetFileLocalizer(); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-LoadStore-6.gld"); } @Test @@ -129,7 +210,7 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = native 'hadoop-examples.jar' Store a into '/tmp/table_testNativeMRJobSimple_input' Load '/tmp/table_testNativeMRJobSimple_output' `wordcount /tmp/table_testNativeMRJobSimple_input /tmp/table_testNativeMRJobSimple_output`;" + - "store b into 'file:///tmp/output';"; + "store b into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Native-1.gld"); } @@ -140,7 +221,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = filter a by x > 0;" + "c = foreach b generate y;" + - "store c into 'file:///tmp/output';"; + "store c into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Filter-1.gld"); } @@ -151,7 +232,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = group a by x;" + "c = foreach b generate group, COUNT(a.x);" + - "store c into 'file:///tmp/output';"; + "store c into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Group-1.gld"); } @@ -163,12 +244,131 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = join a by x, b by x;" + "d = foreach c generate a::x as x, y, z;" + - "store d into 'file:///tmp/output';"; + "store d into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Join-1.gld"); } @Test + public void testBloomJoin() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x, y:int);" + + "b = load 'file:///tmp/input2' as (x, z:int);" + + "c = load 'file:///tmp/input2' as (x, w:int);" + + "d = join b by x, a by x, c by x using 'bloom';" + + "e = foreach d generate a::x as x, y, z, w;" + + "store e into 'file:///tmp/pigoutput';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld"); + } + + @Test + public void testBloomJoinLeftOuter() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:chararray, y:int);" + + "b = load 'file:///tmp/input2' as (x:chararray, z:int);" + + "d = join a by x left, b by x using 'bloom';" + + "e = foreach d generate a::x as x, y, z;" + + "store e into 'file:///tmp/pigoutput';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld"); + } + + @Test + public void testBloomJoinUnion() throws Exception { + // Left input from a union + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = load 'file:///tmp/input2' as (x:int, z:int);" + + "c = load 'file:///tmp/input3' as (x:int, z:int);" + + "b = union b, c;" + + "d = join a by x, b by x using 'bloom';" + + "e = foreach d generate a::x as x, y, z;" + + "store e into 'file:///tmp/pigoutput';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld"); + setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null); + + resetScope(); + // Right input from a union + query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = load 'file:///tmp/input2' as (x:int, z:int);" + + "c = load 'file:///tmp/input3' as (x:int, z:int);" + + "b = union b, c;" + + "d = join b by x, a by x using 'bloom';" + + "e = foreach d generate a::x as x, y, z;" + + "store e into 'file:///tmp/pigoutput';"; + + // Needs shared edges and PIG-3856 to be a more optimial plan + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld"); + } + + @Test + public void testBloomJoinSplit() throws Exception { + // Left input from a split + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = load 'file:///tmp/input2' as (x:int, z:int);" + + "a1 = filter a by x == 3;" + + "a2 = filter a by x == 4;" + + "d = join a1 by x, a2 by x, b by x using 'bloom';" + + "e = foreach d generate a1::x as x, a1::y as y1, a2::y as y2, z;" + + "store e into 'file:///tmp/pigoutput';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld"); + setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, null); + + resetScope(); + // Right input from a split + query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = load 'file:///tmp/input2' as (x:int, z:int);" + + "a1 = filter a by x == 3;" + + "a2 = filter a by x == 4;" + + "d = join b by x, a1 by x using 'bloom';" + + "e = foreach d generate a1::x as x, y, z;" + + "store a2 into 'file:///tmp/pigoutput/a2';" + + "store e into 'file:///tmp/pigoutput/e';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld"); + } + + @Test + public void testBloomSelfJoin() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = filter a by x < 5;" + + "c = filter a by x == 10;" + + "d = filter a by x > 10;" + + "e = join b by x, c by x, d by x using 'bloom';" + + "store e into 'file:///tmp/pigoutput';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, "reduce"); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld"); + } + + @Test public void testSelfJoin() throws Exception { String query = "a = load 'file:///tmp/input1' as (x:int, y:int);" + @@ -176,7 +376,7 @@ public class TestTezCompiler { "c = filter a by x == 10;" + "d = filter a by x > 10;" + "e = join b by x, c by x, d by x;" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-1.gld"); } @@ -188,7 +388,7 @@ public class TestTezCompiler { "b = filter a by x < 5;" + "c = filter a by x == 10;" + "d = join b by x, c by x using 'skewed';" + - "store d into 'file:///tmp/output';"; + "store d into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld"); } @@ -201,7 +401,7 @@ public class TestTezCompiler { "c = filter a by x == 10;" + "d = filter a by x > 10;" + "e = join b by x, c by x, d by x using 'replicated';" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-3.gld"); } @@ -213,7 +413,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = union a, b;" + "d = join b by x, c by x using 'replicated';" + - "store d into 'file:///tmp/output';"; + "store d into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-4.gld"); } @@ -226,7 +426,7 @@ public class TestTezCompiler { "a2 = filter a by x < 2;" + "b = union a1, a2;" + "c = join b by x, a by x;" + - "store c into 'file:///tmp/output';"; + "store c into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-5.gld"); } @@ -242,7 +442,7 @@ public class TestTezCompiler { "a5 = foreach a4 generate a2::x as x, a3::y as y;" + "b = union a1, a5;" + "c = join b by x, a by x;" + - "store c into 'file:///tmp/output';"; + "store c into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-6.gld"); } @@ -254,7 +454,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = cross a, b;" + "d = foreach c generate a::x as x, y, z;" + - "store d into 'file:///tmp/output';"; + "store d into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-1.gld"); } @@ -266,7 +466,7 @@ public class TestTezCompiler { "b = filter a by x < 5;" + "c = filter a by x == 10;" + "d = cross b, c;" + - "store d into 'file:///tmp/output';"; + "store d into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-2.gld"); } @@ -278,7 +478,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = cross b, a;" + "d = foreach c generate a.x, a.y, z;" + //Scalar - "store d into 'file:///tmp/output';"; + "store d into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-3.gld"); } @@ -290,7 +490,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = join a by x, b by x using 'skewed';" + "d = foreach c generate a::x as x, y, z;" + - "store d into 'file:///tmp/output';"; + "store d into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld"); } @@ -303,7 +503,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = join a by x, b by x using 'skewed';" + "d = foreach c generate a::x as x, y, z;" + - "store d into 'file:///tmp/output';"; + "store d into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld"); } @@ -314,7 +514,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = limit a 10;" + "c = foreach b generate y;" + - "store c into 'file:///tmp/output';"; + "store c into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-1.gld"); } @@ -325,7 +525,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = order a by x, y;" + "c = limit b 10;" + - "store c into 'file:///tmp/output';"; + "store c into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld"); } @@ -338,18 +538,31 @@ public class TestTezCompiler { "g = group a all;" + "h = foreach g generate COUNT(a) as sum;" + "c = limit b h.sum/2;" + - "store c into 'file:///tmp/output';"; + "store c into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-3.gld"); } @Test + public void testLimitReplJoin() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:int);" + + "b = load 'file:///tmp/input' as (x:int, y:int);" + + "c = limit a 1;" + + "d = join c by x, b by x using 'replicated';" + + "store a into 'file:///tmp/pigoutput/a';" + + "store d into 'file:///tmp/pigoutput/d';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-4.gld"); + } + + @Test public void testDistinct() throws Exception { String query = "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = distinct a;" + "c = foreach b generate y;" + - "store c into 'file:///tmp/output';"; + "store c into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-1.gld"); } @@ -360,7 +573,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = group a by x;" + "c = foreach b { d = distinct a; generate COUNT(d); };" + - "store c into 'file:///tmp/output';"; + "store c into 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Distinct-2.gld"); } @@ -374,7 +587,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = load 'file:///tmp/input3' as (x:int, z:int);" + "d = join a by x, b by x, c by x using 'replicated';" + - "store d into 'file:///tmp/output/d';"; + "store d into 'file:///tmp/pigoutput/d';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-1.gld"); } @@ -387,7 +600,7 @@ public class TestTezCompiler { "b1 = foreach b generate group, COUNT(a.y);" + "c = load 'file:///tmp/input2' as (x:int, z:int);" + "d = join b1 by group, c by x using 'replicated';" + - "store d into 'file:///tmp/output/e';"; + "store d into 'file:///tmp/pigoutput/e';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-FRJoin-2.gld"); } @@ -397,7 +610,7 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" + "b = stream a through `stream.pl -n 5`;" + - "STORE b INTO 'file:///tmp/output';"; + "STORE b INTO 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Stream-1.gld"); } @@ -408,7 +621,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int, z:int);" + "b = group a by $0;" + "c = foreach b { d = limit a 10; e = order d by $1; f = order e by $0; generate group, f;};"+ - "store c INTO 'file:///tmp/output';"; + "store c INTO 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SecKeySort-1.gld"); @@ -422,7 +635,7 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" + "b = order a by x;" + - "STORE b INTO 'file:///tmp/output';"; + "STORE b INTO 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld"); } @@ -433,7 +646,7 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, y:int);" + "b = filter a by x == 1;" + "c = order b by x;" + - "STORE c INTO 'file:///tmp/output';"; + "STORE c INTO 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld"); } @@ -444,7 +657,7 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input' using org.apache.pig.backend.hadoop.hbase.HBaseStorage(',') as (x:int, y:int);" + "b = order a by x;" + - "STORE b INTO 'file:///tmp/output';"; + "STORE b INTO 'file:///tmp/pigoutput';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-3.gld"); setProperty("pig.sort.readonce.loadfuncs", null); @@ -459,7 +672,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input2' as (x:int, z:int);" + "c = cogroup a by x, b by x;" + "d = foreach c generate group, COUNT(a.y), COUNT(b.z);" + - "store d into 'file:///tmp/output/d';"; + "store d into 'file:///tmp/pigoutput/d';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cogroup-1.gld"); } @@ -469,9 +682,9 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input' as (x:int, y:int);" + "split a into b if x <= 5, c if x <= 10, d if x >10;" + - "store b into 'file:///tmp/output/b';" + - "store c into 'file:///tmp/output/c';" + - "store d into 'file:///tmp/output/d';"; + "store b into 'file:///tmp/pigoutput/b';" + + "store c into 'file:///tmp/pigoutput/c';" + + "store d into 'file:///tmp/pigoutput/d';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-1.gld"); @@ -500,14 +713,14 @@ public class TestTezCompiler { // Needs to be removed in Tez plan as well. "f1 = limit f 1;" + "f2 = union d1, f1;" + - "store b1 into 'file:///tmp/output/b1';" + - "store b2 into 'file:///tmp/output/b2';" + - "store c1 into 'file:///tmp/output/c1';" + - "store c3 into 'file:///tmp/output/c1';" + - "store d1 into 'file:///tmp/output/d1';" + - "store e1 into 'file:///tmp/output/e1';" + - "store f1 into 'file:///tmp/output/f1';" + - "store f2 into 'file:///tmp/output/f2';"; + "store b1 into 'file:///tmp/pigoutput/b1';" + + "store b2 into 'file:///tmp/pigoutput/b2';" + + "store c1 into 'file:///tmp/pigoutput/c1';" + + "store c3 into 'file:///tmp/pigoutput/c1';" + + "store d1 into 'file:///tmp/pigoutput/d1';" + + "store e1 into 'file:///tmp/pigoutput/e1';" + + "store f1 into 'file:///tmp/pigoutput/f1';" + + "store f2 into 'file:///tmp/pigoutput/f2';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld"); @@ -523,8 +736,8 @@ public class TestTezCompiler { "b = foreach b generate group, COUNT(a.x);" + "c = group a by (x,y);" + "c = foreach c generate group, COUNT(a.y);" + - "store b into 'file:///tmp/output/b';" + - "store c into 'file:///tmp/output/c';"; + "store b into 'file:///tmp/pigoutput/b';" + + "store c into 'file:///tmp/pigoutput/c';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-3.gld"); @@ -540,9 +753,9 @@ public class TestTezCompiler { "c = join a by x, b by x;" + "d = foreach c generate $0, $1, $3;" + "e = foreach c generate $0, $1, $2, $3;" + - "store c into 'file:///tmp/output/c';" + - "store d into 'file:///tmp/output/d';" + - "store e into 'file:///tmp/output/e';"; + "store c into 'file:///tmp/pigoutput/c';" + + "store d into 'file:///tmp/pigoutput/d';" + + "store e into 'file:///tmp/pigoutput/e';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-4.gld"); @@ -555,13 +768,13 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input' as (x:int, y:int);" + "b = group a by x;" + //b: {group: int,a: {(x: int,y: int)}} - "store b into 'file:///tmp/output/b';" + + "store b into 'file:///tmp/pigoutput/b';" + "c = foreach b generate a.x, a.y;" + //c: {{(x: int)},{(y: int)}} - "store c into 'file:///tmp/output/c';" + + "store c into 'file:///tmp/pigoutput/c';" + "d = foreach b GENERATE FLATTEN(a);" + //d: {a::x: int,a::y: int} - "store d into 'file:///tmp/output/d';" + + "store d into 'file:///tmp/pigoutput/d';" + "e = foreach d GENERATE a::x, a::y;" + - "store e into 'file:///tmp/output/e';"; + "store e into 'file:///tmp/pigoutput/e';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-5.gld"); @@ -576,8 +789,8 @@ public class TestTezCompiler { "b = group a by x;" + "c = foreach b generate group, COUNT(a) as cnt;" + "SPLIT a into d if (2 * c.cnt) < y, e OTHERWISE;" + - "store d into 'file:///tmp/output1';" + - "store e into 'file:///tmp/output2';"; + "store d into 'file:///tmp/pigoutput1';" + + "store e into 'file:///tmp/pigoutput2';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6.gld"); @@ -594,7 +807,7 @@ public class TestTezCompiler { "c = join a by $0, b by $0 using 'replicated';" + "d = join a by $1, b by $1 using 'replicated';" + "e = union c,d;" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7.gld"); @@ -613,7 +826,7 @@ public class TestTezCompiler { "c = foreach c generate $0 as c1;" + "d = group a by x;" + "e = foreach d generate group, b.b1, c.c1;" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-8.gld"); @@ -623,18 +836,67 @@ public class TestTezCompiler { } @Test + public void testMultiQueryMultipleReplicateJoinWithUnion() throws Exception { + // Replicate joins are from a split + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = load 'file:///tmp/input2' as (x:int, y:int);" + + "c = load 'file:///tmp/input3' as (x:int, y:int);" + + "d = union a, b;" + + "e = filter c by y < 2;" + + "f = filter c by y > 5;" + + "g = join d by x, e by x using 'replicated';" + + "h = join g by d::x, f by x using 'replicated';" + + "store h into 'file:///tmp/pigoutput';"; + + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-9-OPTOFF.gld"); + + // Union is also from a split + query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = filter a by x == 2;" + + "c = load 'file:///tmp/input3' as (x:int, y:int);" + + "d = union a, b;" + + "e = filter c by y < 2;" + + "f = filter c by y > 5;" + + "g = join d by x, e by x using 'replicated';" + + "h = join g by d::x, f by x using 'replicated';" + + "store h into 'file:///tmp/pigoutput';"; + + resetScope(); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-10-OPTOFF.gld"); + } + + @Test public void testUnionStore() throws Exception { String query = "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + "c = union onschema a, b;" + - "store c into 'file:///tmp/output';"; + "store c into 'file:///tmp/pigoutput';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld"); resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); + query = + "a = load 'file:///tmp/input' as (x:int, y:chararray);" + + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + + "c = union onschema a, b PARALLEL 15;" + + "store c into 'file:///tmp/pigoutput';"; + // Union optimization should be turned off if PARALLEL clause is specified + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld"); } @Test @@ -643,14 +905,15 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + "c = union onschema a, b;" + - "store c into 'file:///tmp/output';"; + "store c into 'file:///tmp/pigoutput';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); String oldSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS); String oldUnSupported = getProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName()); - // Plan should not have union optimization applied + // Plan should not have union optimization applied as PigStorage is unsupported run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-OPTOFF.gld"); + resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, null); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, OrcStorage.class.getName()); @@ -658,27 +921,37 @@ public class TestTezCompiler { "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + "c = union onschema a, b;" + - "store c into 'file:///tmp/output' using " + DummyStoreWithOutputFormat.class.getName() + "();"; - // Plan should not have union optimization applied + "store c into 'file:///tmp/pigoutput' using " + DummyStoreWithOutputFormat.class.getName() + "();"; + // Plan should not have union optimization applied as only ORC is supported run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld"); resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null); + query = + "a = load 'file:///tmp/input' as (x:int, y:chararray);" + + "b = load 'file:///tmp/input' as (y:chararray, x:int);" + + "c = union onschema a, b;" + + "store c into 'file:///tmp/pigoutput' using " + TestDummyStoreFunc.class.getName() + "();"; + // Plan should not have union optimization applied as supportsParallelWriteToStoreLocation returns false + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore2-OPTOFF.gld"); + + resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, PigStorage.class.getName()); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, null); query = "a = load 'file:///tmp/input' as (x:int, y:chararray);" + "split a into b if x > 5, c if x == 7, d if x == 8, e otherwise;" + "u1 = union onschema b, c;" + - "store u1 into 'file:///tmp/output/u1';" + + "store u1 into 'file:///tmp/pigoutput/u1';" + //TODO: multiple levels of split not merged "u2 = union onschema a, b, c;" + - "store u2 into 'file:///tmp/output/u2';" + + "store u2 into 'file:///tmp/pigoutput/u2';" + "u3 = union onschema d, e;" + - "store u3 into 'file:///tmp/output/u3';" + + "store u3 into 'file:///tmp/pigoutput/u3';" + "j1 = join d by x, a by x using 'replicated';" + "j1 = foreach j1 generate d::x as x, d::y as y;" + "u4 = union onschema j1, a;" + - "store u4 into 'file:///tmp/output/u4';"; + "store u4 into 'file:///tmp/pigoutput/u4';"; // Plan should have union optimization applied even for unsupported storefunc run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld"); @@ -696,7 +969,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = group c by x;" + "e = foreach d generate group, SUM(c.y);" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld"); @@ -712,7 +985,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join c by x, d by x;" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld"); @@ -729,7 +1002,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join c by x, d by x using 'replicated';" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; //TODO: PIG-3856 Not optimized setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); @@ -743,7 +1016,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join d by x, c by x using 'replicated';" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; // Optimized setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); @@ -762,7 +1035,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join c by x, d by x using 'skewed';" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld"); @@ -779,7 +1052,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input' as (y:chararray, x:int);" + "c = union onschema a, b;" + "d = order c by x;" + - "store d into 'file:///tmp/output';"; + "store d into 'file:///tmp/pigoutput';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld"); @@ -795,7 +1068,7 @@ public class TestTezCompiler { "b = load 'file:///tmp/input' as (y:chararray, x:int);" + "c = union onschema a, b;" + "d = limit c 1;" + - "store d into 'file:///tmp/output';"; + "store d into 'file:///tmp/pigoutput';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld"); @@ -811,9 +1084,9 @@ public class TestTezCompiler { "split a into a1 if x > 100, a2 otherwise;" + "c = union onschema a1, a2, b;" + "split c into d if x > 500, e otherwise;" + - "store a2 into 'file:///tmp/output/a2';" + - "store d into 'file:///tmp/output/d';" + - "store e into 'file:///tmp/output/e';"; + "store a2 into 'file:///tmp/pigoutput/a2';" + + "store d into 'file:///tmp/pigoutput/d';" + + "store e into 'file:///tmp/pigoutput/e';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld"); @@ -831,8 +1104,8 @@ public class TestTezCompiler { "d = load 'file:///tmp/input1' as (x:int, y:chararray);" + "e = union onschema c, d;" + "f = group e by x;" + - "store e into 'file:///tmp/output1';" + - "store f into 'file:///tmp/output2';"; + "store e into 'file:///tmp/pigoutput1';" + + "store f into 'file:///tmp/pigoutput2';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld"); @@ -850,7 +1123,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, y:chararray);" + "e = union onschema c, d;" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11.gld"); @@ -872,10 +1145,10 @@ public class TestTezCompiler { "c2 = foreach c generate y, x;" + "c3 = union c1, c2;" + "a1 = union onschema b3, c3;" + - "store a1 into 'file:///tmp/output1';" + + "store a1 into 'file:///tmp/pigoutput1';" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join a1 by x, d by x using 'skewed';" + - "store e into 'file:///tmp/output2';"; + "store e into 'file:///tmp/pigoutput2';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12.gld"); @@ -892,7 +1165,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join c by x, d by x using 'replicated';" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13.gld"); @@ -906,7 +1179,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join d by x, c by x using 'replicated';" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); @@ -924,7 +1197,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join c by x, d by x using 'skewed';" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld"); @@ -938,7 +1211,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = join d by x, c by x using 'skewed';" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); @@ -956,7 +1229,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = filter c by x == d.x;" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-17.gld"); @@ -970,7 +1243,7 @@ public class TestTezCompiler { "c = union onschema a, b;" + "d = load 'file:///tmp/input1' as (x:int, z:chararray);" + "e = filter d by x == c.x;" + - "store e into 'file:///tmp/output';"; + "store e into 'file:///tmp/pigoutput';"; resetScope(); setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); @@ -981,11 +1254,76 @@ public class TestTezCompiler { } @Test + public void testUnionSplitUnionStore() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:chararray);" + + "b = load 'file:///tmp/input1' as (y:chararray, x:int);" + + "c = union onschema a, b;" + + "split c into d if x <= 5, e if x <= 10, f if x >10, g if y == '6';" + + "h = union onschema d, e;" + + "i = union onschema f, g;" + + "store h into 'file:///tmp/pigoutput/1';" + + "store i into 'file:///tmp/pigoutput/2';"; + + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-19.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-19-OPTOFF.gld"); + + // With a join in between + query = + "a = load 'file:///tmp/input' as (x:chararray);" + + "b = load 'file:///tmp/input' as (x:chararray);" + + "c = load 'file:///tmp/input' as (y:chararray);" + + "u1 = union onschema a, b;" + + "SPLIT u1 INTO r IF x != '', s OTHERWISE;" + + "d = JOIN r BY x LEFT, c BY y;" + + "u2 = UNION ONSCHEMA d, s;" + + "e = FILTER u2 BY x == '';" + + "f = FILTER u2 BY x == 'm';" + + "u3 = UNION ONSCHEMA e, f;" + + "store u3 into 'file:///tmp/pigoutput';"; + + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-20.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-20-OPTOFF.gld"); + } + + @Test + public void testUnionSplitUnionLimitStore() throws Exception { + // Similar to previous testcase but a LIMIT at the end to test a non-store vertex group + String query = + "a = load 'file:///tmp/input' as (x:chararray);" + + "b = load 'file:///tmp/input' as (x:chararray);" + + "c = load 'file:///tmp/input' as (y:chararray);" + + "u1 = union onschema a, b;" + + "SPLIT u1 INTO r IF x != '', s OTHERWISE;" + + "d = JOIN r BY x LEFT, c BY y;" + + "u2 = UNION ONSCHEMA d, s;" + + "e = FILTER u2 BY x == '';" + + "f = FILTER u2 BY x == 'm';" + + "u3 = UNION ONSCHEMA e, f;" + + "SPLIT u3 INTO t if x != '', u OTHERWISE;" + + "v = LIMIT t 10;" + + "store v into 'file:///tmp/pigoutput';"; + + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + true); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21.gld"); + resetScope(); + setProperty(PigConfiguration.PIG_TEZ_OPT_UNION, "" + false); + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-21-OPTOFF.gld"); + } + + @Test public void testRank() throws Exception { String query = "a = load 'file:///tmp/input1' as (x:int, y:int);" + "b = rank a;" + - "store b into 'file:///tmp/output/d';"; + "store b into 'file:///tmp/pigoutput/d';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-1.gld"); } @@ -996,7 +1334,7 @@ public class TestTezCompiler { String query = "a = load 'file:///tmp/input1' as (x:int, y:int);" + "b = rank a by x;" + - "store b into 'file:///tmp/output/d';"; + "store b into 'file:///tmp/pigoutput/d';"; run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld"); } @@ -1052,5 +1390,32 @@ public class TestTezCompiler { assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)), TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean))); } + + public static class TestDummyStoreFunc extends StoreFunc { + + @Override + public OutputFormat getOutputFormat() throws IOException { + return null; + } + + @Override + public void setStoreLocation(String location, Job job) + throws IOException { + } + + @Override + public void prepareToWrite(RecordWriter writer) throws IOException { + } + + @Override + public void putNext(Tuple t) throws IOException { + } + + @Override + public Boolean supportsParallelWriteToStoreLocation() { + return false; + } + + } }