Author: olga Date: Wed May 7 10:35:55 2008 New Revision: 654188 URL: http://svn.apache.org/viewvc?rev=654188&view=rev Log: PIG-230: support for shipping for multuiple commands
Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=654188&r1=654187&r2=654188&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Wed May 7 10:35:55 2008 @@ -275,6 +275,9 @@ PIG-229: Proper error handling in case of deserializer failure + PIG-230: Handling shipment for multiple ship/cache commands (acmurthy via + olgan) + PIG-219: Change unit tests to run both local and map reduce modes (kali via gates). PIG-202: Fix Order by so that user provided comparator func is used for quantile determination (kali via gates). Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java?rev=654188&r1=654187&r2=654188&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java Wed May 7 10:35:55 2008 @@ -63,6 +63,14 @@ public int mapParallelism = -1; // -1 means let hadoop decide public int reduceParallelism = -1; + /** + * A list of configs to be merged, not overwritten ... + */ + private static String[] PIG_CONFIGS_TO_MERGE = + { + "pig.streaming.cache.files", + "pig.streaming.ship.files", + }; static MapReduceLauncher mapReduceLauncher = new MapReduceLauncher(); @@ -158,7 +166,7 @@ public void addInputFile(FileSpec fileSpec, EvalSpec evalSpec){ inputFileSpecs.add(fileSpec); toMap.add(evalSpec); - properties.putAll(evalSpec.getProperties()); + mergeProperties(evalSpec.getProperties()); } @@ -249,7 +257,7 @@ else toMap.set(i, toMap.get(i).addSpec(spec)); - properties.putAll(spec.getProperties()); + mergeProperties(spec.getProperties()); } public void addReduceSpec(EvalSpec spec){ @@ -258,7 +266,7 @@ else toReduce = toReduce.addSpec(spec); - properties.putAll(spec.getProperties()); + mergeProperties(spec.getProperties()); } public void setProperty(String key, String value) { @@ -272,6 +280,27 @@ public void visit(POVisitor v) { v.visitMapreduce(this); } + + // TODO: Ugly hack! Need a better way to manage multiple properties + // Presumably it should be a part of Hadoop Configuration. + private void mergeProperties(Properties other) { + Properties mergedProperties = new Properties(); + + for (String key : PIG_CONFIGS_TO_MERGE) { + String value = properties.getProperty(key); + String otherValue = other.getProperty(key); + + if (value != null && otherValue != null) { + mergedProperties.setProperty(key, value + ", " + otherValue); + } + } + + // Copy the other one + properties.putAll(other); + + // Now, overwrite with the merged one + properties.putAll(mergedProperties); + } } Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java?rev=654188&r1=654187&r2=654188&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java Wed May 7 10:35:55 2008 @@ -57,6 +57,10 @@ private static void parseShipCacheSpecs(List<String> specs, Properties properties, String property) { + if (specs == null || specs.size() == 0) { + return; + } + // Setup streaming-specific properties StringBuffer sb = new StringBuffer(); Iterator<String> i = specs.iterator(); Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=654188&r1=654187&r2=654188&view=diff ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original) +++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Wed May 7 10:35:55 2008 @@ -202,7 +202,8 @@ " print STDERR \"STDERR: $_\n\";", "}", }; - File command = Util.createInputFile("script", "pl", script); + File command1 = Util.createInputFile("script", "pl", script); + File command2 = Util.createInputFile("script", "pl", script); // Expected results String[] expectedFirstFields = @@ -213,15 +214,21 @@ // Pig query to run pigServer.registerQuery( - "define CMD `" + command.getName() + " foo` " + - "ship ('" + command + "') " + + "define CMD1 `" + command1.getName() + " foo` " + + "ship ('" + command1 + "') " + "input('foo' using " + PigStorage.class.getName() + "(',')) " + "stderr();"); - + pigServer.registerQuery( + "define CMD2 `" + command2.getName() + " bar` " + + "ship ('" + command2 + "') " + + "input('bar' using " + PigStorage.class.getName() + "(',')) " + + "stderr();"); pigServer.registerQuery("IP = load 'file:" + input + "' using " + PigStorage.class.getName() + "(',');"); pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); - pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;"); + pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " + + "through CMD1;"); + pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;"); String output = "/pig/out"; pigServer.deleteFile(output);