Author: gates Date: Fri Apr 11 17:33:09 2008 New Revision: 647356 URL: http://svn.apache.org/viewvc?rev=647356&view=rev Log: PIG-204: Repair broken input splits.
Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=647356&r1=647355&r2=647356&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Fri Apr 11 17:33:09 2008 @@ -226,3 +226,5 @@ PIG-203: Fix bug in parameter substitution code where any pig script over 1k caused pig to freeze. (kali via gates) + + PIG-204: Repair broken input splits (acmurthy via gates). Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=647356&r1=647355&r2=647356&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Fri Apr 11 17:33:09 2008 @@ -233,21 +233,28 @@ private void connectInputs(OperatorKey[] compiledInputs, POMapreduce pom, Map<OperatorKey, ExecPhysicalOperator> physicalOpTable) throws IOException { - // connect inputs (by merging operators, if possible; else connect via temp files) + // connect inputs (by merging operators, if possible; + // else connect via temp files) for (int i = 0; i < compiledInputs.length; i++) { - if (okayToMergeWithBinaryOp((POMapreduce)physicalOpTable.get(compiledInputs[i]))) { + POMapreduce input = + (POMapreduce)physicalOpTable.get(compiledInputs[i]); + + if (okayToMergeWithBinaryOp(input)) { // can merge input i with this operator - pom.addInputFile(((POMapreduce)physicalOpTable.get(compiledInputs[i])).getFileSpec(0), - ((POMapreduce)physicalOpTable.get(compiledInputs[i])).getEvalSpec(0)); - pom.addInputOperators(((PhysicalOperator)physicalOpTable.get(compiledInputs[i])).inputs); + pom.addInputFile(input.getFileSpec(0), input.getEvalSpec(0)); + pom.addInputOperators(input.inputs); } else { // chain together via a temp file String tempFile = getTempFile(pigContext); - FileSpec fileSpec = new FileSpec( tempFile, BinStorage.class.getName()); - ((POMapreduce)physicalOpTable.get(compiledInputs[i])).outputFileSpec = fileSpec; + FileSpec fileSpec = new FileSpec(tempFile, + BinStorage.class.getName()); + input.outputFileSpec = fileSpec; pom.addInputFile(fileSpec); pom.addInputOperator(compiledInputs[i]); } + + // propagate input properties + pom.properties.putAll(input.properties); } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java?rev=647356&r1=647355&r2=647356&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java Fri Apr 11 17:33:09 2008 @@ -47,7 +47,7 @@ @SuppressWarnings("unchecked") public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - boolean isSplittable = job.getBoolean("pig.input.splittable", false); + boolean isSplittable = job.getBoolean("pig.input.splittable", true); ArrayList<FileSpec> inputs = (ArrayList<FileSpec>) ObjectSerializer .deserialize(job.get("pig.inputs")); ArrayList<EvalSpec> mapFuncs = (ArrayList<EvalSpec>) ObjectSerializer Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java?rev=647356&r1=647355&r2=647356&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java Fri Apr 11 17:33:09 2008 @@ -243,6 +243,7 @@ try{ EvalSpec es = (EvalSpec) ObjectSerializer.deserialize(ObjectSerializer.serialize(this)); es.instantiateFunc(pigContext); + es.properties.putAll(properties); return es; }catch(IOException e){ throw new RuntimeException(e);