Author: gates
Date: Tue Sep 15 23:59:00 2009
New Revision: 815571

URL: http://svn.apache.org/viewvc?rev=815571&view=rev
Log:
PIG-911 Added SequenceFileLoader to piggybank


Added:
    
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java
    
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java
Modified:
    hadoop/pig/trunk/contrib/CHANGES.txt

Modified: hadoop/pig/trunk/contrib/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/CHANGES.txt?rev=815571&r1=815570&r2=815571&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/CHANGES.txt Tue Sep 15 23:59:00 2009
@@ -1,3 +1,4 @@
+PIG-911: Added SequenceFileLoader (dryaboy via gates)
 PIG-885: New UDFs for piggybank (Bin, Decode, LookupInFiles, RegexExtract, 
RegexMatch, HashFVN, DiffDate) (daijy)
 PIG-868: added strin manipulation functions (bennies via olgan)
 PIG-273: addition of Top and SearchQuery UDFs (ankur via olgan)

Added: 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java?rev=815571&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java
 (added)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java
 Tue Sep 15 23:59:00 2009
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.storage;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.SamplableLoader;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.datastorage.DataStorage;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * A Loader for Hadoop-Standard SequenceFiles.
+ * able to work with the following types as keys or values:
+ * Text, IntWritable, LongWritable, FloatWritable, DoubleWritable, 
BooleanWritable, ByteWritable
+ **/
+
+public class SequenceFileLoader implements LoadFunc, SamplableLoader {
+  
+  private SequenceFile.Reader reader;
+  private long end;
+  private Writable key;
+  private Writable value;
+  private ArrayList<Object> mProtoTuple = null;
+  
+  protected static final Log LOG = LogFactory.getLog(SequenceFileLoader.class);
+  protected TupleFactory mTupleFactory = TupleFactory.getInstance();
+  protected SerializationFactory serializationFactory;
+
+  protected byte keyType;
+  protected byte valType;
+    
+  public SequenceFileLoader() {
+  
+  }
+  
+  @Override
+  public void bindTo(String fileName, BufferedPositionedInputStream is,
+      long offset, long end) throws IOException {
+    
+    inferReader(fileName);
+    if (offset != 0)
+      reader.sync(offset);
+
+    this.end = end;
+    
+    try {
+      this.key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), 
PigMapReduce.sJobConf);
+      this.value = (Writable) 
ReflectionUtils.newInstance(reader.getValueClass(), PigMapReduce.sJobConf);
+    } catch (ClassCastException e) {
+      throw new RuntimeException("SequenceFile contains non-Writable objects", 
e);
+    }
+    setKeyValueTypes(key.getClass(), value.getClass());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Schema determineSchema(String fileName, ExecType execType,
+      DataStorage storage) throws IOException {
+    inferReader(fileName);
+    Class<Writable> keyClass = null;
+    Class<Writable> valClass= null;
+    try {
+      keyClass = (Class<Writable>) reader.getKeyClass();
+      valClass = (Class<Writable>) reader.getValueClass();
+    } catch (ClassCastException e) {
+      throw new RuntimeException("SequenceFile contains non-Writable objects", 
e);
+    }
+    Schema schema = new Schema();
+    setKeyValueTypes(keyClass, valClass);  
+    schema.add(new Schema.FieldSchema(null, keyType));
+    schema.add(new Schema.FieldSchema(null, valType));
+    return schema;
+  }
+
+  protected void setKeyValueTypes(Class<?> keyClass, Class<?> valueClass) 
throws BackendException {
+    this.keyType |= inferPigDataType(keyClass);
+    this.valType |= inferPigDataType(valueClass);
+    if (keyType == DataType.ERROR) { 
+      LOG.warn("Unable to translate key "+key.getClass()+" to a Pig datatype");
+      throw new BackendException("Unable to translate "+key.getClass()+" to a 
Pig datatype");
+    } 
+    if (valType == DataType.ERROR) {
+      LOG.warn("Unable to translate value "+value.getClass()+" to a Pig 
datatype");
+      throw new BackendException("Unable to translate "+value.getClass()+" to 
a Pig datatype");
+    }
+
+  }
+  protected void inferReader(String fileName) throws IOException {
+    if (reader == null) {
+      Configuration conf = new Configuration();
+      Path path = new Path(fileName);
+      FileSystem fs = FileSystem.get(path.toUri(), conf);
+      reader = new SequenceFile.Reader(fs, path, conf);
+    }
+  }
+  
+  protected byte inferPigDataType(Type t) {
+    if (t == DataByteArray.class) return DataType.BYTEARRAY;
+    else if (t == Text.class) return DataType.CHARARRAY;
+    else if (t == IntWritable.class) return DataType.INTEGER;
+    else if (t == LongWritable.class) return DataType.LONG;
+    else if (t == FloatWritable.class) return DataType.FLOAT;
+    else if (t == DoubleWritable.class) return DataType.DOUBLE;
+    else if (t == BooleanWritable.class) return DataType.BOOLEAN;
+    else if (t == ByteWritable.class) return DataType.BYTE;
+    // not doing maps or other complex types for now
+    else return DataType.ERROR;
+  }
+  
+  protected Object translateWritableToPigDataType(Writable w, byte dataType) {
+    switch(dataType) {
+      case DataType.CHARARRAY: return ((Text) w).toString();
+      case DataType.BYTEARRAY: return((DataByteArray) w).get();
+      case DataType.INTEGER: return ((IntWritable) w).get();
+      case DataType.LONG: return ((LongWritable) w).get();
+      case DataType.FLOAT: return ((FloatWritable) w).get();
+      case DataType.DOUBLE: return ((DoubleWritable) w).get();
+      case DataType.BYTE: return ((ByteWritable) w).get();
+    }
+    
+    return null;
+  }
+  
+  @Override
+  public void fieldsToRead(Schema schema) {
+    // not implemented
+
+  }
+
+  @Override
+  public Tuple getNext() throws IOException {
+    if (mProtoTuple == null) mProtoTuple = new ArrayList<Object>(2);
+    if (reader != null && (reader.getPosition() < end || !reader.syncSeen()) 
&& reader.next(key, value)) {
+      mProtoTuple.add(translateWritableToPigDataType(key, keyType));
+      mProtoTuple.add(translateWritableToPigDataType(value, valType));
+      Tuple t =  mTupleFactory.newTuple(mProtoTuple);
+      mProtoTuple.clear();
+      return t;
+    }
+    return null;
+  }
+
+  @Override
+  public long getPosition() throws IOException {
+    return reader.getPosition();
+  }
+
+  @Override
+  public Tuple getSampledTuple() throws IOException {
+    return this.getNext();
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    long startPos = reader.getPosition();
+    reader.sync(startPos+n);
+    return reader.getPosition()-startPos;
+  }
+
+  @Override
+  public DataBag bytesToBag(byte[] b) throws IOException {
+    throw new FrontendException("SequenceFileLoader does not expect to cast 
data.");
+  }
+
+  @Override
+  public String bytesToCharArray(byte[] b) throws IOException {
+    throw new FrontendException("SequenceFileLoader does not expect to cast 
data.");
+  }
+
+  @Override
+  public Double bytesToDouble(byte[] b) throws IOException {
+    throw new FrontendException("SequenceFileLoader does not expect to cast 
data.");
+  }
+
+  @Override
+  public Float bytesToFloat(byte[] b) throws IOException {
+    throw new FrontendException("SequenceFileLoader does not expect to cast 
data.");
+  }
+
+  @Override
+  public Integer bytesToInteger(byte[] b) throws IOException {
+    throw new FrontendException("SequenceFileLoader does not expect to cast 
data.");
+  }
+
+  @Override
+  public Long bytesToLong(byte[] b) throws IOException {
+    throw new FrontendException("SequenceFileLoader does not expect to cast 
data.");
+  }
+
+  @Override
+  public Map<String, Object> bytesToMap(byte[] b) throws IOException {
+    throw new FrontendException("SequenceFileLoader does not expect to cast 
data.");
+  }
+
+  @Override
+  public Tuple bytesToTuple(byte[] b) throws IOException {
+    throw new FrontendException("SequenceFileLoader does not expect to cast 
data.");
+  }
+}

Added: 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java?rev=815571&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java
 (added)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestSequenceFileLoader.java
 Tue Sep 15 23:59:00 2009
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import static org.apache.pig.ExecType.LOCAL;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+import org.junit.Test;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+//import org.apache.pig.test.PigExecTestCase;
+import org.apache.pig.test.Util;
+
+//public class TestSequenceFileLoader extends PigExecTestCase  {
+  public class TestSequenceFileLoader extends TestCase {
+  private static final String[] DATA = {
+    "one, two, buckle my shoe",
+    "three, four, shut the door",
+    "five, six, something else" };
+  
+  private static final String[][] EXPECTED = {
+    {"0", "one, two, buckle my shoe"},
+    {"1", "three, four, shut the door"},
+    {"2", "five, six, something else"}
+  };
+  
+  private String tmpFileName;
+  
+  private PigServer pigServer;
+  @Override
+  public void setUp() throws Exception {
+    pigServer = new PigServer(LOCAL);
+    File tmpFile = File.createTempFile("test", ".txt");
+    tmpFileName = tmpFile.getAbsolutePath();
+    System.err.println("fileName: "+tmpFileName);
+    Path path = new Path("file:///"+tmpFileName);
+    JobConf conf = new JobConf();
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    
+    IntWritable key = new IntWritable();
+    Text value = new Text();
+    SequenceFile.Writer writer = null;
+    try {
+      writer = SequenceFile.createWriter(fs, conf, path, 
+                                         key.getClass(), value.getClass());
+      for (int i=0; i < DATA.length; i++) {
+        key.set(i);
+        value.set(DATA[i]);
+        writer.append(key, value);
+      }
+    } finally {
+      IOUtils.closeStream(writer);
+    }
+  }
+  
+  @Test
+  public void testReadsNocast() throws IOException {
+    pigServer.registerQuery("A = LOAD 'file:" + Util.encodeEscape(tmpFileName) 
+ 
+    "' USING org.apache.pig.piggybank.storage.SequenceFileLoader() AS (key, 
val);");
+    Iterator<?> it = pigServer.openIterator("A");
+    int tupleCount = 0;
+    while (it.hasNext()) {
+      Tuple tuple = (Tuple) it.next();
+      if (tuple == null)
+        break;
+      else {
+        System.err.println("expect:---: "+EXPECTED[tupleCount][0]);
+        assertEquals(EXPECTED[tupleCount][0], tuple.get(0).toString());
+        assertEquals(EXPECTED[tupleCount][1], tuple.get(1).toString());
+        tupleCount++;
+      }
+    }
+    assertEquals(DATA.length, tupleCount);
+  }
+  
+  @Test
+  public void testReadsStringCast() throws IOException {
+    pigServer.registerQuery("A = LOAD 'file:" + Util.encodeEscape(tmpFileName) 
+ 
+    "' USING org.apache.pig.piggybank.storage.SequenceFileLoader() AS 
(key:long, val);");
+    Iterator<?> it = pigServer.openIterator("A");
+    int tupleCount = 0;
+    while (it.hasNext()) {
+      Tuple tuple = (Tuple) it.next();
+      if (tuple == null)
+        break;
+      else {
+        assertEquals(Long.parseLong(EXPECTED[tupleCount][0]), tuple.get(0));
+        assertEquals(EXPECTED[tupleCount][1], tuple.get(1));
+        tupleCount++;
+      }
+    }
+    assertEquals(DATA.length, tupleCount);
+  }
+}


Reply via email to