ACCUMULO-1783 Reworking the storage side. Took an approach like HBase's for the "regular" AccumuloStorage class. Normal tuples are treated as a row, with the first entry being the rowkey and subsequent entries as column values. Maps are expanded as column:value pairs, any scalars, bags or tuples require a column in the AccumuloStorage constructor. Lots of nice unit tests for the functionality
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/ad03c51b Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/ad03c51b Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/ad03c51b Branch: refs/heads/ACCUMULO-1783 Commit: ad03c51b3e5ee8baea76c0df6a4ca7c8df2b0606 Parents: 8c46d9b Author: Josh Elser <els...@apache.org> Authored: Wed Oct 30 20:24:06 2013 -0400 Committer: Josh Elser <els...@apache.org> Committed: Wed Oct 30 20:24:06 2013 -0400 ---------------------------------------------------------------------- pom.xml | 18 +++ .../accumulo/pig/AbstractAccumuloStorage.java | 9 +- .../apache/accumulo/pig/AccumuloKVStorage.java | 6 +- .../apache/accumulo/pig/AccumuloStorage.java | 121 +++++++++++-------- .../java/org/apache/accumulo/pig/FORMAT.java | 25 ++++ .../accumulo/pig/AccumuloStorageTest.java | 18 +-- 6 files changed, 134 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 630d5e2..b096123 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,24 @@ <artifactId>guava</artifactId> <version>15.0</version> </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-minicluster</artifactId> + <version>1.4.4</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.pig</groupId> + <artifactId>pigunit</artifactId> + <version>0.12.0</version> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java index 2361dcf..c2345cc 100644 --- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java +++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java @@ -394,7 +394,14 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF } protected Text objToText(Object o, byte type) throws IOException { - return new Text(objToBytes(o, type)); + byte[] bytes = objToBytes(o, type); + + if (null == bytes) { + LOG.warn("Creating empty text from null value"); + return new Text(); + } + + return new Text(bytes); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java index 8462985..13b34ce 100644 --- a/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java +++ b/src/main/java/org/apache/accumulo/pig/AccumuloKVStorage.java @@ -44,9 +44,9 @@ import org.apache.pig.data.TupleFactory; * provided:</p> * * <ul> - * <li>(key, colfam, colqual, value)</li> - * <li>(key, colfam, colqual, colvis, value)</li> - * <li>(key, colfam, colqual, colvis, timestamp, value)</li> + * <li>(row, colfam, colqual, value)</li> + * <li>(row, colfam, colqual, colvis, value)</li> + * <li>(row, colfam, colqual, colvis, timestamp, value)</li> * </ul> */ public class AccumuloKVStorage extends AbstractAccumuloStorage { http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java index 030b0c3..97fb44f 100644 --- a/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java +++ b/src/main/java/org/apache/accumulo/pig/AccumuloStorage.java @@ -1,7 +1,6 @@ package org.apache.accumulo.pig; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -37,8 +36,13 @@ public class AccumuloStorage extends AbstractAccumuloStorage { private static final String COMMA = ",", COLON = ":"; private static final Text EMPTY_TEXT = new Text(new byte[0]); + public static final String METADATA_SUFFIX = "_metadata"; + protected final List<String> columnSpecs; + // Not sure if AccumuloStorage instances need to be thread-safe or not + final Text _cfHolder = new Text(), _cqHolder = new Text(); + public AccumuloStorage() { this(""); } @@ -46,6 +50,8 @@ public class AccumuloStorage extends AbstractAccumuloStorage { public AccumuloStorage(String columns) { this.caster = new Utf8StorageConverter(); + // TODO It would be nice to have some other means than enumerating + // the CF for every column in the Tuples we're going process if (!StringUtils.isBlank(columns)) { String[] columnArray = StringUtils.split(columns, COMMA); columnSpecs = Lists.newArrayList(columnArray); @@ -121,7 +127,7 @@ public class AccumuloStorage extends AbstractAccumuloStorage { @Override public Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException { final ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields(); - + Iterator<Object> tupleIter = tuple.iterator(); if (1 >= tuple.size()) { @@ -129,25 +135,21 @@ public class AccumuloStorage extends AbstractAccumuloStorage { return Collections.emptyList(); } - Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas) ? null : fieldSchemas[0])); - - // TODO Can these be lifted up to members of the class instead of this method? - // Not sure if AccumuloStorage instances need to be thread-safe or not - final Text _cfHolder = new Text(), _cqHolder = new Text(); + Mutation mutation = new Mutation(objectToText(tupleIter.next(), (null == fieldSchemas) ? null : fieldSchemas[0])); int columnOffset = 0; int tupleOffset = 1; while (tupleIter.hasNext()) { Object o = tupleIter.next(); - String cf = null; + String family = null; // Figure out if the user provided a specific columnfamily to use. if (columnOffset < columnSpecs.size()) { - cf = columnSpecs.get(columnOffset); + family = columnSpecs.get(columnOffset); } // Grab the type for this field - byte type = schemaToType(o, (null == fieldSchemas) ? null : fieldSchemas[tupleOffset]); + final byte type = schemaToType(o, (null == fieldSchemas) ? null : fieldSchemas[tupleOffset]); // If we have a Map, we want to treat every Entry as a column in this record // placing said column in the column family unless this instance of AccumuloStorage @@ -159,53 +161,25 @@ public class AccumuloStorage extends AbstractAccumuloStorage { for (Entry<String,Object> entry : map.entrySet()) { Object entryObject = entry.getValue(); - byte entryType = DataType.findType(entryObject); - - Value value = new Value(objToBytes(entryObject, entryType)); - // If we have a CF, use it and push the Map's key down to the CQ - if (null != cf) { - int index = cf.indexOf(COLON); + // Treat a null value in the map as the lack of this column + // The input may have come from a structured source where the + // column could not have been omitted. We can handle the lack of the column + if (null != entryObject) { + byte entryType = DataType.findType(entryObject); + Value value = new Value(objToBytes(entryObject, entryType)); - // No colon in the provided column - if (-1 == index) { - _cfHolder.set(cf); - _cqHolder.set(entry.getKey()); - - mutation.put(_cfHolder, _cqHolder, value); - } else { - _cfHolder.set(cf.getBytes(), 0, index); - - _cqHolder.set(cf.getBytes(), index + 1, cf.length() - (index + 1)); - _cqHolder.append(entry.getKey().getBytes(), 0, entry.getKey().length()); - - mutation.put(_cfHolder, _cqHolder, value); - } - } else { - // Just put the Map's key into the CQ - _cqHolder.set(entry.getKey()); - mutation.put(EMPTY_TEXT, _cqHolder, value); + addColumn(mutation, family, entry.getKey(), value); } } - } else if (null == cf) { - // We don't know what column to place the value into - log.warn("Was provided no column family for non-Map entry in the tuple at offset " + tupleOffset); } else { - Value value = new Value(objToBytes(o, type)); + byte[] bytes = objToBytes(o, type); - // We have something that isn't a Map, use the provided CF as a column name - // and then shove the value into the Value - int index = cf.indexOf(COLON); - if (-1 == index) { - _cqHolder.set(cf); + if (null != bytes) { + Value value = new Value(bytes); - mutation.put(EMPTY_TEXT, _cqHolder, value); - } else { - byte[] cfBytes = cf.getBytes(); - _cfHolder.set(cfBytes, 0, index); - _cqHolder.set(cfBytes, index + 1, cfBytes.length - (index + 1)); - - mutation.put(_cfHolder, _cqHolder, value); + // We don't have any column name from non-Maps + addColumn(mutation, family, null, value); } } @@ -219,4 +193,51 @@ public class AccumuloStorage extends AbstractAccumuloStorage { return Collections.singletonList(mutation); } + + /** + * Adds column and value to the given mutation. A columnfamily and optional column qualifier + * or column qualifier prefix is pulled from {@link columnDef} with the family and qualifier + * delimiter being a colon. If {@link columnName} is non-null, it will be appended to the qualifier. + * + * If both the {@link columnDef} and {@link columnName} are null, nothing is added to the mutation + * + * @param mutation + * @param columnDef + * @param columnName + * @param columnValue + */ + protected void addColumn(Mutation mutation, String columnDef, String columnName, Value columnValue) { + if (null == columnDef && null == columnName) { + log.warn("Was provided no name or definition for column. Ignoring value"); + return; + } + + if (null != columnDef) { + // use the provided columnDef to make a cf (with optional cq prefix) + int index = columnDef.indexOf(COLON); + if (-1 == index) { + _cfHolder.set(columnDef); + _cqHolder.clear(); + + } else { + byte[] cfBytes = columnDef.getBytes(); + _cfHolder.set(cfBytes, 0, index); + _cqHolder.set(cfBytes, index + 1, cfBytes.length - (index + 1)); + } + } else { + _cfHolder.clear(); + _cqHolder.clear(); + } + + // If we have a column name (this came from a Map) + // append that name on the cq. + if (null != columnName) { + byte[] cnBytes = columnName.getBytes(); + + // CQ is either empty or has a prefix from the columnDef + _cqHolder.append(cnBytes, 0, cnBytes.length); + } + + mutation.put(_cfHolder, _cqHolder, columnValue); + } } http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/main/java/org/apache/accumulo/pig/FORMAT.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/accumulo/pig/FORMAT.java b/src/main/java/org/apache/accumulo/pig/FORMAT.java new file mode 100644 index 0000000..a72987a --- /dev/null +++ b/src/main/java/org/apache/accumulo/pig/FORMAT.java @@ -0,0 +1,25 @@ +package org.apache.accumulo.pig; + +import java.io.IOException; + +import org.apache.pig.EvalFunc; +import org.apache.pig.data.Tuple; + +public class FORMAT extends EvalFunc<String> { + + @Override + public String exec(Tuple input) throws IOException { + if (0 == input.size()) { + return null; + } + + final String format = input.get(0).toString(); + Object[] args = new Object[input.size() - 1]; + for (int i = 1; i < input.size(); i++) { + args[i-1] = input.get(i); + } + + return String.format(format, args); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/ad03c51b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java index 10777a8..db80c47 100644 --- a/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java +++ b/src/test/java/org/apache/accumulo/pig/AccumuloStorageTest.java @@ -67,8 +67,8 @@ public class AccumuloStorageTest { Assert.assertEquals(1, colUpdates.size()); ColumnUpdate colUpdate = colUpdates.get(0); - Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0])); - Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes())); + Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes())); + Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), new byte[0])); Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value".getBytes())); } @@ -120,8 +120,8 @@ public class AccumuloStorageTest { Assert.assertEquals(4, colUpdates.size()); ColumnUpdate colUpdate = colUpdates.get(0); - Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0])); - Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col1".getBytes())); + Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col1".getBytes())); + Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), new byte[0])); Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes())); colUpdate = colUpdates.get(1); @@ -135,8 +135,8 @@ public class AccumuloStorageTest { Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value3".getBytes())); colUpdate = colUpdates.get(3); - Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0])); - Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col2".getBytes())); + Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col2".getBytes())); + Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), new byte[0])); Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value4".getBytes())); } @@ -161,8 +161,8 @@ public class AccumuloStorageTest { Assert.assertEquals(1, colUpdates.size()); ColumnUpdate colUpdate = colUpdates.get(0); - Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), new byte[0])); - Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), "col".getBytes())); + Assert.assertTrue("CF not equal", Arrays.equals(colUpdate.getColumnFamily(), "col".getBytes())); + Assert.assertTrue("CQ not equal", Arrays.equals(colUpdate.getColumnQualifier(), new byte[0])); Assert.assertTrue("Values not equal", Arrays.equals(colUpdate.getValue(), "value1".getBytes())); } @@ -194,7 +194,7 @@ public class AccumuloStorageTest { Assert.assertEquals(5, colUpdates.size()); Map<Entry<String,String>,String> expectations = Maps.newHashMap(); - expectations.put(Maps.immutableEntry("", "col"), "value1"); + expectations.put(Maps.immutableEntry("col", ""), "value1"); expectations.put(Maps.immutableEntry("", "mapcol1"), "mapval1"); expectations.put(Maps.immutableEntry("", "mapcol2"), "mapval2"); expectations.put(Maps.immutableEntry("", "mapcol3"), "mapval3");