Author: olga
Date: Wed Nov 11 18:28:48 2009
New Revision: 835005
URL: http://svn.apache.org/viewvc?rev=835005&view=rev
Log:
PIG-1080: PigStorage may miss records when loading a file (rding via olgan)
Added:
hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=835005&r1=835004&r2=835005&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Nov 11 18:28:48 2009
@@ -115,6 +115,8 @@
BUG FIXES
+PIG-1080: PigStorage may miss records when loading a file (rding via olgan)
+
PIG-1071: Support comma separated file/directory names in load statements
(rding via pradeepkth)
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java?rev=835005&r1=835004&r2=835005&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
Wed Nov 11 18:28:48 2009
@@ -88,11 +88,24 @@
throw new ExecException(msg, errCode, PigException.BUG, exp);
}
}
+
+ end = start + getLength();
+
+ // Since we are not block aligned and we throw away the first
+ // record (count on a different instance to read it), we make the
+ // successor block overlap one byte with the previous block when
+ // using PigStorage for uncompressed text files (which in turn
+ // uses Hadoop LineRecordReader).
+ if (start != 0 && (loader instanceof PigStorage)) {
+ if (!(file.endsWith(".bz") || file.endsWith(".bz2")) &&
+ !(file.endsWith(".gz"))) {
+ --start;
+ }
+ }
+
fsis = base.asElement(base.getActiveContainer(), file).sopen();
fsis.seek(start, FLAGS.SEEK_CUR);
- end = start + getLength();
-
if (file.endsWith(".bz") || file.endsWith(".bz2")) {
is = new CBZip2InputStream(fsis, 9);
} else if (file.endsWith(".gz")) {
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=835005&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigStorage.java Wed Nov 11
18:28:48 2009
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.apache.pig.ExecType.MAPREDUCE;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestPigStorage {
+
+ protected final Log log = LogFactory.getLog(getClass());
+
+ private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static PigServer pigServer = null;
+
+
+ @BeforeClass
+ public static void setup() {
+ try {
+ pigServer = new PigServer(MAPREDUCE, cluster.getProperties());
+ } catch (ExecException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ pigServer.shutdown();
+ }
+
+ @Test
+ public void testBlockBoundary() {
+
+ // This tests PigStorage loader with records exectly
+ // on the boundary of the file blocks.
+ String[] inputs = {
+ "abcdefgh1", "abcdefgh2", "abcdefgh3",
+ "abcdefgh4", "abcdefgh5", "abcdefgh6",
+ "abcdefgh7", "abcdefgh8", "abcdefgh9"
+ };
+
+ String[] expected = {
+ "(abcdefgh1)", "(abcdefgh2)", "(abcdefgh3)",
+ "(abcdefgh4)", "(abcdefgh5)", "(abcdefgh6)",
+ "(abcdefgh7)", "(abcdefgh8)", "(abcdefgh9)"
+ };
+
+ System.setProperty("pig.overrideBlockSize", "20");
+
+ String INPUT_FILE = "tmp.txt";
+
+ try {
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ for (String s : inputs) {
+ w.println(s);
+ }
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ pigServer.registerQuery("a = load 'file:" + INPUT_FILE + "';");
+
+ Iterator<Tuple> iter = pigServer.openIterator("a");
+ int counter = 0;
+ while (iter.hasNext()){
+ assertEquals(expected[counter++].toString(),
iter.next().toString());
+ }
+
+ assertEquals(expected.length, counter);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ new File(INPUT_FILE).delete();
+ try {
+ Util.deleteFile(cluster, INPUT_FILE);
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
+
+}