ACCUMULO-1783 Reworking the storage side.

Took an approach like HBase's for the "regular" AccumuloStorage class.
Normal tuples are treated as a row, with the first entry being the
rowkey and subsequent entries as column values. Maps are expanded as
column:value pairs, any scalars, bags or tuples require a column in the
AccumuloStorage constructor. Lots of nice unit tests for the
functionality


Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/ad03c51b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/ad03c51b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/ad03c51b

Branch: refs/heads/ACCUMULO-1783
Commit: ad03c51b3e5ee8baea76c0df6a4ca7c8df2b0606
Parents: 8c46d9b
Author: Josh Elser <els...@apache.org>
Authored: Wed Oct 30 20:24:06 2013 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Wed Oct 30 20:24:06 2013 -0400

----------------------------------------------------------------------
 pom.xml                                         |  18 +++
 .../accumulo/pig/AbstractAccumuloStorage.java   |   9 +-
 .../apache/accumulo/pig/AccumuloKVStorage.java  |   6 +-
 .../apache/accumulo/pig/AccumuloStorage.java    | 121 +++++++++++--------
 .../java/org/apache/accumulo/pig/FORMAT.java    |  25 ++++
 .../accumulo/pig/AccumuloStorageTest.java       |  18 +--
 6 files changed, 134 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 630d5e2..b096123 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,6 +51,24 @@
       <artifactId>guava</artifactId>
       <version>15.0</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.4.5</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-minicluster</artifactId>
+      <version>1.4.4</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pig</groupId>
+      <artifactId>pigunit</artifactId>
+      <version>0.12.0</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   
 </project>

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java 
b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
index 2361dcf..c2345cc 100644
--- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java
@@ -394,7 +394,14 @@ public abstract class AbstractAccumuloStorage extends 
LoadFunc implements StoreF
   }
   
   protected Text objToText(Object o, byte type) throws IOException {
-    return new Text(objToBytes(o, type));
+    byte[] bytes = objToBytes(o, type);
+    
+    if (null == bytes) {
+      LOG.warn("Creating empty text from null value");
+      return new Text();
+    }
+    
+    return new Text(bytes);
   }
   
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java 
b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
index 8462985..13b34ce 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java
@@ -44,9 +44,9 @@ import org.apache.pig.data.TupleFactory;
  * provided:</p>
  * 
  * <ul>
- * <li>(key, colfam, colqual, value)</li>
- * <li>(key, colfam, colqual, colvis, value)</li>
- * <li>(key, colfam, colqual, colvis, timestamp, value)</li> 
+ * <li>(row, colfam, colqual, value)</li>
+ * <li>(row, colfam, colqual, colvis, value)</li>
+ * <li>(row, colfam, colqual, colvis, timestamp, value)</li> 
  * </ul>
  */
 public class AccumuloKVStorage extends AbstractAccumuloStorage {

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java 
b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
index 030b0c3..97fb44f 100644
--- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
+++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java
@@ -1,7 +1,6 @@
 package org.apache.accumulo.pig;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -37,8 +36,13 @@ public class AccumuloStorage extends AbstractAccumuloStorage 
{
   private static final String COMMA = ",", COLON = ":";
   private static final Text EMPTY_TEXT = new Text(new byte[0]);
   
+  public static final String METADATA_SUFFIX = "_metadata";
+  
   protected final List<String> columnSpecs;
   
+  // Not sure if AccumuloStorage instances need to be thread-safe or not
+  final Text _cfHolder = new Text(), _cqHolder = new Text();
+  
   public AccumuloStorage() {
     this("");
   }
@@ -46,6 +50,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage {
   public AccumuloStorage(String columns) {
     this.caster = new Utf8StorageConverter();
     
+    // TODO It would be nice to have some other means than enumerating
+    // the CF for every column in the Tuples we're going process
     if (!StringUtils.isBlank(columns)) {
       String[] columnArray = StringUtils.split(columns, COMMA);
       columnSpecs = Lists.newArrayList(columnArray);
@@ -121,7 +127,7 @@ public class AccumuloStorage extends 
AbstractAccumuloStorage {
   @Override
   public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, 
IOException {
     final ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : 
schema.getFields();
-    
+      
     Iterator<Object> tupleIter = tuple.iterator();
     
     if (1 >= tuple.size()) {
@@ -129,25 +135,21 @@ public class AccumuloStorage extends 
AbstractAccumuloStorage {
       return Collections.emptyList();
     }
     
-    Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == 
fieldSchemas) ? null : fieldSchemas[0]));
-    
-    // TODO Can these be lifted up to members of the class instead of this 
method?
-    // Not sure if AccumuloStorage instances need to be thread-safe or not
-    final Text _cfHolder = new Text(), _cqHolder = new Text();
+    Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == 
fieldSchemas) ? null : fieldSchemas[0]));    
     
     int columnOffset = 0;
     int tupleOffset = 1;
     while (tupleIter.hasNext()) {
       Object o = tupleIter.next();
-      String cf = null;
+      String family = null;
       
       // Figure out if the user provided a specific columnfamily to use.
       if (columnOffset < columnSpecs.size()) {
-        cf = columnSpecs.get(columnOffset);
+        family = columnSpecs.get(columnOffset);
       }
       
       // Grab the type for this field
-      byte type = schemaToType(o, (null == fieldSchemas) ? null : 
fieldSchemas[tupleOffset]);
+      final byte type = schemaToType(o, (null == fieldSchemas) ? null : 
fieldSchemas[tupleOffset]);
       
       // If we have a Map, we want to treat every Entry as a column in this 
record
       // placing said column in the column family unless this instance of 
AccumuloStorage
@@ -159,53 +161,25 @@ public class AccumuloStorage extends 
AbstractAccumuloStorage {
         
         for (Entry<String,Object> entry : map.entrySet()) {
           Object entryObject = entry.getValue();
-          byte entryType = DataType.findType(entryObject);
-          
-          Value value = new Value(objToBytes(entryObject, entryType));
           
-          // If we have a CF, use it and push the Map's key down to the CQ
-          if (null != cf) {
-            int index = cf.indexOf(COLON);
+          // Treat a null value in the map as the lack of this column
+          // The input may have come from a structured source where the
+          // column could not have been omitted. We can handle the lack of the 
column
+          if (null != entryObject) {
+            byte entryType = DataType.findType(entryObject);
+            Value value = new Value(objToBytes(entryObject, entryType));
             
-            // No colon in the provided column
-            if (-1 == index) {
-              _cfHolder.set(cf);
-              _cqHolder.set(entry.getKey());
-              
-              mutation.put(_cfHolder, _cqHolder, value);
-            } else {
-              _cfHolder.set(cf.getBytes(), 0, index);
-              
-              _cqHolder.set(cf.getBytes(), index + 1, cf.length() - (index + 
1));
-              _cqHolder.append(entry.getKey().getBytes(), 0, 
entry.getKey().length());
-              
-              mutation.put(_cfHolder, _cqHolder, value);
-            }
-          } else {
-            // Just put the Map's key into the CQ
-            _cqHolder.set(entry.getKey());
-            mutation.put(EMPTY_TEXT, _cqHolder, value);
+            addColumn(mutation, family, entry.getKey(), value);
           }
         }
-      } else if (null == cf) {
-        // We don't know what column to place the value into
-        log.warn("Was provided no column family for non-Map entry in the tuple 
at offset " + tupleOffset);
       } else {
-        Value value = new Value(objToBytes(o, type));
+        byte[] bytes = objToBytes(o, type);
         
-        // We have something that isn't a Map, use the provided CF as a column 
name
-        // and then shove the value into the Value
-        int index = cf.indexOf(COLON);
-        if (-1 == index) {
-          _cqHolder.set(cf);
+        if (null != bytes) {
+          Value value = new Value(bytes);
           
-          mutation.put(EMPTY_TEXT, _cqHolder, value);
-        } else {
-          byte[] cfBytes = cf.getBytes();
-          _cfHolder.set(cfBytes, 0, index);
-          _cqHolder.set(cfBytes, index + 1, cfBytes.length - (index + 1));
-          
-          mutation.put(_cfHolder, _cqHolder, value);
+          // We don't have any column name from non-Maps
+          addColumn(mutation, family, null, value);
         }
       }
       
@@ -219,4 +193,51 @@ public class AccumuloStorage extends 
AbstractAccumuloStorage {
     
     return Collections.singletonList(mutation);
   }
+  
+  /**
+   * Adds column and value to the given mutation. A columnfamily and optional 
column qualifier
+   * or column qualifier prefix is pulled from {@link columnDef} with the 
family and qualifier 
+   * delimiter being a colon. If {@link columnName} is non-null, it will be 
appended to the qualifier.
+   * 
+   * If both the {@link columnDef} and {@link columnName} are null, nothing is 
added to the mutation
+   * 
+   * @param mutation
+   * @param columnDef
+   * @param columnName
+   * @param columnValue
+   */
+  protected void addColumn(Mutation mutation, String columnDef, String 
columnName, Value columnValue) {
+    if (null == columnDef && null == columnName) {
+      log.warn("Was provided no name or definition for column. Ignoring 
value");
+      return;
+    }
+    
+    if (null != columnDef) {
+      // use the provided columnDef to make a cf (with optional cq prefix)
+      int index = columnDef.indexOf(COLON);
+      if (-1 == index) {
+        _cfHolder.set(columnDef);
+        _cqHolder.clear();
+        
+      } else {
+        byte[] cfBytes = columnDef.getBytes();
+        _cfHolder.set(cfBytes, 0, index);
+        _cqHolder.set(cfBytes, index + 1, cfBytes.length - (index + 1)); 
+      }
+    } else {
+      _cfHolder.clear();
+      _cqHolder.clear();
+    }
+    
+    // If we have a column name (this came from a Map)
+    // append that name on the cq.
+    if (null != columnName) {
+      byte[] cnBytes = columnName.getBytes();
+      
+      // CQ is either empty or has a prefix from the columnDef
+      _cqHolder.append(cnBytes, 0, cnBytes.length);
+    }
+    
+    mutation.put(_cfHolder, _cqHolder, columnValue);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/FORMAT.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/pig/FORMAT.java 
b/src/main/java/org/apache/accumulo/pig/FORMAT.java
new file mode 100644
index 0000000..a72987a
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/pig/FORMAT.java
@@ -0,0 +1,25 @@
+package org.apache.accumulo.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+
+public class FORMAT extends EvalFunc<String> {
+
+  @Override
+  public String exec(Tuple input) throws IOException {
+    if (0 == input.size()) {
+      return null;
+    }
+    
+    final String format = input.get(0).toString();
+    Object[] args = new Object[input.size() - 1];
+    for (int i = 1; i < input.size(); i++) {
+      args[i-1] = input.get(i);
+    }
+    
+    return String.format(format, args);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java 
b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
index 10777a8..db80c47 100644
--- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
+++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java
@@ -67,8 +67,8 @@ public class AccumuloStorageTest {
     Assert.assertEquals(1, colUpdates.size());
     
     ColumnUpdate colUpdate = colUpdates.get(0);
-    Assert.assertTrue("CF not equal", 
Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
-    Assert.assertTrue("CQ not equal", 
Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+    Assert.assertTrue("CF not equal", 
Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+    Assert.assertTrue("CQ not equal", 
Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
     Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), 
"value".getBytes()));
   }
   
@@ -120,8 +120,8 @@ public class AccumuloStorageTest {
     Assert.assertEquals(4, colUpdates.size());
     
     ColumnUpdate colUpdate = colUpdates.get(0);
-    Assert.assertTrue("CF not equal", 
Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
-    Assert.assertTrue("CQ not equal", 
Arrays.equals(colUpdate.getColumnQualifier(), "col1".getBytes()));
+    Assert.assertTrue("CF not equal", 
Arrays.equals(colUpdate.getColumnFamily(), "col1".getBytes()));
+    Assert.assertTrue("CQ not equal", 
Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
     Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), 
"value1".getBytes()));
     
     colUpdate = colUpdates.get(1);
@@ -135,8 +135,8 @@ public class AccumuloStorageTest {
     Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), 
"value3".getBytes()));
     
     colUpdate = colUpdates.get(3);
-    Assert.assertTrue("CF not equal", 
Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
-    Assert.assertTrue("CQ not equal", 
Arrays.equals(colUpdate.getColumnQualifier(), "col2".getBytes()));
+    Assert.assertTrue("CF not equal", 
Arrays.equals(colUpdate.getColumnFamily(), "col2".getBytes()));
+    Assert.assertTrue("CQ not equal", 
Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
     Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), 
"value4".getBytes()));
   }
   
@@ -161,8 +161,8 @@ public class AccumuloStorageTest {
     Assert.assertEquals(1, colUpdates.size());
     
     ColumnUpdate colUpdate = colUpdates.get(0);
-    Assert.assertTrue("CF not equal", 
Arrays.equals(colUpdate.getColumnFamily(), new byte[0]));
-    Assert.assertTrue("CQ not equal", 
Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes()));
+    Assert.assertTrue("CF not equal", 
Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes()));
+    Assert.assertTrue("CQ not equal", 
Arrays.equals(colUpdate.getColumnQualifier(), new byte[0]));
     Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), 
"value1".getBytes()));
   }
   
@@ -194,7 +194,7 @@ public class AccumuloStorageTest {
     Assert.assertEquals(5, colUpdates.size());
     
     Map<Entry<String,String>,String> expectations = Maps.newHashMap();
-    expectations.put(Maps.immutableEntry("", "col"), "value1");
+    expectations.put(Maps.immutableEntry("col", ""), "value1");
     expectations.put(Maps.immutableEntry("", "mapcol1"), "mapval1");
     expectations.put(Maps.immutableEntry("", "mapcol2"), "mapval2");
     expectations.put(Maps.immutableEntry("", "mapcol3"), "mapval3");

Reply via email to