Author: gates
Date: Fri Apr 11 17:33:09 2008
New Revision: 647356

URL: http://svn.apache.org/viewvc?rev=647356&view=rev
Log:
PIG-204: Repair broken input splits.


Modified:
    incubator/pig/trunk/CHANGES.txt
    
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
    
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=647356&r1=647355&r2=647356&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Fri Apr 11 17:33:09 2008
@@ -226,3 +226,5 @@
 
        PIG-203: Fix bug in parameter substitution code where any pig script 
over
        1k caused pig to freeze. (kali via gates)
+
+       PIG-204: Repair broken input splits (acmurthy via gates).

Modified: 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=647356&r1=647355&r2=647356&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
 Fri Apr 11 17:33:09 2008
@@ -233,21 +233,28 @@
     private void connectInputs(OperatorKey[] compiledInputs, 
                                POMapreduce pom,
                                Map<OperatorKey, ExecPhysicalOperator> 
physicalOpTable) throws IOException {
-        // connect inputs (by merging operators, if possible; else connect via 
temp files)
+        // connect inputs (by merging operators, if possible; 
+        // else connect via temp files)
         for (int i = 0; i < compiledInputs.length; i++) {
-            if 
(okayToMergeWithBinaryOp((POMapreduce)physicalOpTable.get(compiledInputs[i]))) {
+            POMapreduce input = 
+                (POMapreduce)physicalOpTable.get(compiledInputs[i]);
+            
+            if (okayToMergeWithBinaryOp(input)) {
                 // can merge input i with this operator
-                
pom.addInputFile(((POMapreduce)physicalOpTable.get(compiledInputs[i])).getFileSpec(0),
 
-                                  
((POMapreduce)physicalOpTable.get(compiledInputs[i])).getEvalSpec(0));
-                
pom.addInputOperators(((PhysicalOperator)physicalOpTable.get(compiledInputs[i])).inputs);
+                pom.addInputFile(input.getFileSpec(0), input.getEvalSpec(0));
+                pom.addInputOperators(input.inputs);
             } else {
                 // chain together via a temp file
                 String tempFile = getTempFile(pigContext);
-                FileSpec fileSpec = new FileSpec( tempFile, 
BinStorage.class.getName());
-                
((POMapreduce)physicalOpTable.get(compiledInputs[i])).outputFileSpec = fileSpec;
+                FileSpec fileSpec = new FileSpec(tempFile, 
+                                                 BinStorage.class.getName());
+                input.outputFileSpec = fileSpec;
                 pom.addInputFile(fileSpec);
                 pom.addInputOperator(compiledInputs[i]);
             }
+            
+            // propagate input properties
+            pom.properties.putAll(input.properties);
         }
     }
 

Modified: 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java?rev=647356&r1=647355&r2=647356&view=diff
==============================================================================
--- 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
 (original)
+++ 
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
 Fri Apr 11 17:33:09 2008
@@ -47,7 +47,7 @@
     @SuppressWarnings("unchecked")
     public InputSplit[] getSplits(JobConf job, int numSplits)
             throws IOException {
-        boolean isSplittable = job.getBoolean("pig.input.splittable", false);
+        boolean isSplittable = job.getBoolean("pig.input.splittable", true);
         ArrayList<FileSpec> inputs = (ArrayList<FileSpec>) ObjectSerializer
                 .deserialize(job.get("pig.inputs"));
         ArrayList<EvalSpec> mapFuncs = (ArrayList<EvalSpec>) ObjectSerializer

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java?rev=647356&r1=647355&r2=647356&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java Fri Apr 11 
17:33:09 2008
@@ -243,6 +243,7 @@
         try{
             EvalSpec es = (EvalSpec) 
ObjectSerializer.deserialize(ObjectSerializer.serialize(this));
             es.instantiateFunc(pigContext);
+            es.properties.putAll(properties);
             return es;
         }catch(IOException e){
             throw new RuntimeException(e);


Reply via email to