Author: olga
Date: Wed Nov 11 18:28:48 2009
New Revision: 835005

URL: http://svn.apache.org/viewvc?rev=835005&view=rev
Log:
PIG-1080: PigStorage may miss records when loading a file (rding via olgan)

Added:
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=835005&r1=835004&r2=835005&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Nov 11 18:28:48 2009
@@ -115,6 +115,8 @@
 
 BUG FIXES
 
+PIG-1080: PigStorage may miss records when loading a file (rding via olgan)
+
 PIG-1071: Support comma separated file/directory names in load statements
 (rding via pradeepkth)
 

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java?rev=835005&r1=835004&r2=835005&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java 
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java 
Wed Nov 11 18:28:48 2009
@@ -88,11 +88,24 @@
                 throw new ExecException(msg, errCode, PigException.BUG, exp);
             }
         }
+        
+        end = start + getLength();
+        
+        // Since we are not block aligned and we throw away the first
+        // record (count on a different instance to read it), we make the 
+        // successor block overlap one byte with the previous block when
+        // using PigStorage for uncompressed text files (which in turn 
+        // uses Hadoop LineRecordReader).
+        if (start != 0 && (loader instanceof PigStorage)) {
+            if (!(file.endsWith(".bz") || file.endsWith(".bz2")) &&
+                    !(file.endsWith(".gz"))) {
+                --start;        
+            }
+        }
+     
         fsis = base.asElement(base.getActiveContainer(), file).sopen();
         fsis.seek(start, FLAGS.SEEK_CUR);
 
-        end = start + getLength();
-
         if (file.endsWith(".bz") || file.endsWith(".bz2")) {
             is = new CBZip2InputStream(fsis, 9);
         } else if (file.endsWith(".gz")) {

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=835005&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java Wed Nov 11 
18:28:48 2009
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.apache.pig.ExecType.MAPREDUCE;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestPigStorage {
+        
+    protected final Log log = LogFactory.getLog(getClass());
+    
+    private static MiniCluster cluster = MiniCluster.buildCluster();
+    private static PigServer pigServer = null;
+    
+    
+    @BeforeClass
+    public static void setup() {
+        try {
+            pigServer = new PigServer(MAPREDUCE, cluster.getProperties());
+        } catch (ExecException e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+    
+    @AfterClass
+    public static void shutdown() {
+        pigServer.shutdown();
+    }
+    
+    @Test
+    public void testBlockBoundary() {
+        
+        // This tests PigStorage loader with records exectly 
+        // on the boundary of the file blocks.
+        String[] inputs = {
+                "abcdefgh1", "abcdefgh2", "abcdefgh3", 
+                "abcdefgh4", "abcdefgh5", "abcdefgh6",
+                "abcdefgh7", "abcdefgh8", "abcdefgh9"
+        };
+        
+        String[] expected = {
+                "(abcdefgh1)", "(abcdefgh2)", "(abcdefgh3)", 
+                "(abcdefgh4)", "(abcdefgh5)", "(abcdefgh6)",
+                "(abcdefgh7)", "(abcdefgh8)", "(abcdefgh9)"
+        };
+        
+        System.setProperty("pig.overrideBlockSize", "20");
+        
+        String INPUT_FILE = "tmp.txt";
+        
+        try {
+                                    
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            for (String s : inputs) {
+                w.println(s);
+            }
+            w.close();
+            
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+            
+            pigServer.registerQuery("a = load 'file:" + INPUT_FILE + "';");
+            
+            Iterator<Tuple> iter = pigServer.openIterator("a");
+            int counter = 0;
+            while (iter.hasNext()){
+                assertEquals(expected[counter++].toString(), 
iter.next().toString());
+            }
+            
+            assertEquals(expected.length, counter);
+        
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    } 
+
+}


Reply via email to