ACCUMULO-1783 Clean up now dead or unnecessary code.
Project: http://git-wip-us.apache.org/repos/asf/accumulo-pig/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo-pig/commit/4160c161 Tree: http://git-wip-us.apache.org/repos/asf/accumulo-pig/tree/4160c161 Diff: http://git-wip-us.apache.org/repos/asf/accumulo-pig/diff/4160c161 Branch: refs/heads/ACCUMULO-1783 Commit: 4160c1615010a626beedc318fcaaaef06a258068 Parents: 63d29d4 Author: Josh Elser <els...@apache.org> Authored: Fri Nov 8 21:27:41 2013 -0500 Committer: Josh Elser <els...@apache.org> Committed: Fri Nov 8 21:27:41 2013 -0500 ---------------------------------------------------------------------- .../accumulo/pig/AbstractAccumuloStorage.java | 398 ++++--------------- 1 file changed, 84 insertions(+), 314 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo-pig/blob/4160c161/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java index 5faf6c6..a829d4a 100644 --- a/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java +++ b/src/main/java/org/apache/accumulo/pig/AbstractAccumuloStorage.java @@ -73,14 +73,14 @@ import org.joda.time.DateTime; */ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreFuncInterface { private static final Log LOG = LogFactory.getLog(AbstractAccumuloStorage.class); - - private static final String COLON = ":", COMMA = ",", PERIOD = "."; - private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName(), OUTPUT_PREFIX = AccumuloOutputFormat.class.getSimpleName(); - + + private static final String COLON = ":", COMMA = ","; + private static final String INPUT_PREFIX = AccumuloInputFormat.class.getSimpleName(); + private Configuration conf; private RecordReader<Key,Value> reader; private RecordWriter<Text,Mutation> writer; - + String inst; String zookeepers; String user; @@ -90,27 +90,27 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF String auths; Authorizations authorizations; List<Pair<Text,Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text,Text>>(); - + String start = null; String end = null; - + int maxWriteThreads = 10; long maxMutationBufferSize = 10 * 1000 * 1000; int maxLatency = 10 * 1000; - + protected LoadStoreCaster caster; protected ResourceSchema schema; protected String contextSignature = null; - + public AbstractAccumuloStorage() {} - + @Override public Tuple getNext() throws IOException { try { // load the next pair if (!reader.nextKeyValue()) return null; - + Key key = (Key) reader.getCurrentKey(); Value value = (Value) reader.getCurrentValue(); assert key != null && value != null; @@ -119,21 +119,21 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF throw new IOException(e.getMessage()); } } - + protected abstract Tuple getTuple(Key key, Value value) throws IOException; - + @Override @SuppressWarnings("rawtypes") public InputFormat getInputFormat() { return new AccumuloInputFormat(); } - + @Override @SuppressWarnings({"unchecked", "rawtypes"}) public void prepareToRead(RecordReader reader, PigSplit split) { this.reader = reader; } - + private void setLocationFromUri(String location) throws IOException { // ex: // accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&fetch_columns=col1:cq1,col2:cq2&start=abc&end=z @@ -172,13 +172,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF String[] parts = urlParts[0].split("/+"); table = parts[1]; tableName = new Text(table); - + if (auths == null || auths.equals("")) { authorizations = new Authorizations(); } else { authorizations = new Authorizations(auths.split(COMMA)); } - + if (!StringUtils.isEmpty(columns)) { for (String cfCq : columns.split(COMMA)) { if (cfCq.contains(COLON)) { @@ -189,7 +189,7 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF } } } - + } catch (Exception e) { throw new IOException( "Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&" @@ -197,356 +197,126 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF + e.getMessage()); } } - + protected RecordWriter<Text,Mutation> getWriter() { return writer; } - + protected Map<String,String> getInputFormatEntries(Configuration conf) { return getEntries(conf, INPUT_PREFIX); } - + protected Map<String,String> getEntries(Configuration conf, String prefix) { Map<String,String> entries = new HashMap<String,String>(); - + for (Entry<String,String> entry : conf) { String key = entry.getKey(); if (key.startsWith(prefix)) { entries.put(key, entry.getValue()); } } - + return entries; } - - + @Override public void setLocation(String location, Job job) throws IOException { conf = job.getConfiguration(); setLocationFromUri(location); - + Map<String,String> entries = getInputFormatEntries(conf); - - Exception e = new Exception("setLocation"); - e.printStackTrace(System.out); - System.out.println(entries); - + for (String key : entries.keySet()) { conf.unset(key); } - - entries = getInputFormatEntries(conf); - System.out.println(entries); - + AccumuloInputFormat.setInputInfo(conf, user, password.getBytes(), table, authorizations); AccumuloInputFormat.setZooKeeperInstance(conf, inst, zookeepers); if (columnFamilyColumnQualifierPairs.size() > 0) { LOG.info("columns: " + columnFamilyColumnQualifierPairs); AccumuloInputFormat.fetchColumns(conf, columnFamilyColumnQualifierPairs); } - + Collection<Range> ranges = Collections.singleton(new Range(start, end)); - + LOG.info("Scanning Accumulo for " + ranges + " for table " + table); - + AccumuloInputFormat.setRanges(conf, ranges); - + configureInputFormat(conf); - - entries = getInputFormatEntries(conf); - System.out.println(entries); - } - - protected void configureInputFormat(Configuration conf) { - } - - protected void configureOutputFormat(Configuration conf) { - - } - + + /** + * Method to allow specific implementations to add more elements to the Configuration for reading data from Accumulo. + * + * @param conf + */ + protected void configureInputFormat(Configuration conf) {} + + /** + * Method to allow specific implementations to add more elements to the Configuration for writing data to Accumulo. + * + * @param conf + */ + protected void configureOutputFormat(Configuration conf) {} + @Override public String relativeToAbsolutePath(String location, Path curDir) throws IOException { return location; } - + @Override public void setUDFContextSignature(String signature) { this.contextSignature = signature; } - + /* StoreFunc methods */ public void setStoreFuncUDFContextSignature(String signature) { this.contextSignature = signature; - + } - + /** * Returns UDFProperties based on <code>contextSignature</code>. */ protected Properties getUDFProperties() { return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {contextSignature}); } - + public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { return relativeToAbsolutePath(location, curDir); } - + public void setStoreLocation(String location, Job job) throws IOException { conf = job.getConfiguration(); setLocationFromUri(location); -// -// Map<String,String> entries = AccumuloOutputFormat.getRelevantEntries(conf); -// -// Exception e = new Exception("setStoreLocation"); -// e.printStackTrace(System.out); -// System.out.println(entries); -// -// for (String key : entries.keySet()) { -// conf.unset(key); -// } -// -// entries = AccumuloOutputFormat.getRelevantEntries(conf); -// System.out.println(entries); + + // TODO If Pig ever uses a MultipleOutputs-esque construct, this approach will fall apart if (conf.get(AccumuloOutputFormat.class.getSimpleName() + ".configured") == null) { AccumuloOutputFormat.setOutputInfo(conf, user, password.getBytes(), true, table); AccumuloOutputFormat.setZooKeeperInstance(conf, inst, zookeepers); AccumuloOutputFormat.setMaxLatency(conf, maxLatency); AccumuloOutputFormat.setMaxMutationBufferSize(conf, maxMutationBufferSize); AccumuloOutputFormat.setMaxWriteThreads(conf, maxWriteThreads); - + LOG.info("Writing data to " + table); - + configureOutputFormat(conf); } - -// entries = AccumuloOutputFormat.getRelevantEntries(conf); -// System.out.println(entries); } - -// private boolean shouldSetInput(Map<String,String> configEntries) { -// Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries); -// -// for (Map<String,String> group : groupedConfigEntries.values()) { -// if (null != inst) { -// if (!inst.equals(group.get(INPUT_PREFIX + ".instanceName"))) { -// continue; -// } -// } else if (null != group.get(INPUT_PREFIX + ".instanceName")) { -// continue; -// } -// -// if (null != zookeepers) { -// if (!zookeepers.equals(group.get(INPUT_PREFIX + ".zooKeepers"))) { -// continue; -// } -// } else if (null != group.get(INPUT_PREFIX + ".zooKeepers")) { -// continue; -// } -// -// if (null != user) { -// if (!user.equals(group.get(INPUT_PREFIX + ".username"))) { -// continue; -// } -// } else if (null != group.get(INPUT_PREFIX + ".username")) { -// continue; -// } -// -// if (null != password) { -// if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(INPUT_PREFIX + ".password"))) { -// continue; -// } -// } else if (null != group.get(INPUT_PREFIX + ".password")) { -// continue; -// } -// -// if (null != table) { -// if (!table.equals(group.get(INPUT_PREFIX + ".tablename"))) { -// continue; -// } -// } else if (null != group.get(INPUT_PREFIX + ".tablename")) { -// continue; -// } -// -// if (null != authorizations) { -// if (!authorizations.serialize().equals(group.get(INPUT_PREFIX + ".authorizations"))) { -// continue; -// } -// } else if (null != group.get(INPUT_PREFIX + ".authorizations")) { -// continue; -// } -// -// String columnValues = group.get(INPUT_PREFIX + ".columns"); -// if (null != columnFamilyColumnQualifierPairs) { -// StringBuilder expected = new StringBuilder(128); -// for (Pair<Text,Text> column : columnFamilyColumnQualifierPairs) { -// if (0 < expected.length()) { -// expected.append(COMMA); -// } -// -// expected.append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst())))); -// if (column.getSecond() != null) -// expected.append(COLON).append(new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond())))); -// } -// -// if (!expected.toString().equals(columnValues)) { -// continue; -// } -// } else if (null != columnValues) { -// continue; -// } -// -// Range expected = new Range(start, end); -// String serializedRanges = group.get(INPUT_PREFIX + ".ranges"); -// if (null != serializedRanges) { -// try { -// // We currently only support serializing one Range into the Configuration from this Storage class -// ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serializedRanges.getBytes())); -// Range range = new Range(); -// range.readFields(new DataInputStream(bais)); -// -// if (!expected.equals(range)) { -// continue; -// } -// } catch (IOException e) { -// // Got an exception, they don't match -// continue; -// } -// } -// -// // We found a group of entries in the config which are (similar to) what -// // we would have set. -// return false; -// } -// -// // We didn't find any entries that seemed to match, write the config -// return true; -// } -// -// private boolean shouldSetOutput(Map<String,String> configEntries) { -// Map<Integer,Map<String,String>> groupedConfigEntries = permuteConfigEntries(configEntries); -// -// for (Map<String,String> group : groupedConfigEntries.values()) { -// if (null != inst) { -// if (!inst.equals(group.get(OUTPUT_PREFIX + ".instanceName"))) { -// continue; -// } -// } else if (null != group.get(OUTPUT_PREFIX + ".instanceName")) { -// continue; -// } -// -// if (null != zookeepers) { -// if (!zookeepers.equals(group.get(OUTPUT_PREFIX + ".zooKeepers"))) { -// continue; -// } -// } else if (null != group.get(OUTPUT_PREFIX + ".zooKeepers")) { -// continue; -// } -// -// if (null != user) { -// if (!user.equals(group.get(OUTPUT_PREFIX + ".username"))) { -// continue; -// } -// } else if (null != group.get(OUTPUT_PREFIX + ".username")) { -// continue; -// } -// -// if (null != password) { -// if (!new String(Base64.encodeBase64(password.getBytes())).equals(group.get(OUTPUT_PREFIX + ".password"))) { -// continue; -// } -// } else if (null != group.get(OUTPUT_PREFIX + ".password")) { -// continue; -// } -// -// if (null != table) { -// if (!table.equals(group.get(OUTPUT_PREFIX + ".defaulttable"))) { -// continue; -// } -// } else if (null != group.get(OUTPUT_PREFIX + ".defaulttable")) { -// continue; -// } -// -// String writeThreadsStr = group.get(OUTPUT_PREFIX + ".writethreads"); -// try { -// if (null == writeThreadsStr || maxWriteThreads != Integer.parseInt(writeThreadsStr)) { -// continue; -// } -// } catch (NumberFormatException e) { -// // Wasn't a number, so it can't match what we were expecting -// continue; -// } -// -// String mutationBufferStr = group.get(OUTPUT_PREFIX + ".maxmemory"); -// try { -// if (null == mutationBufferStr || maxMutationBufferSize != Long.parseLong(mutationBufferStr)) { -// continue; -// } -// } catch (NumberFormatException e) { -// // Wasn't a number, so it can't match what we were expecting -// continue; -// } -// -// String maxLatencyStr = group.get(OUTPUT_PREFIX + ".maxlatency"); -// try { -// if (null == maxLatencyStr || maxLatency != Long.parseLong(maxLatencyStr)) { -// continue; -// } -// } catch (NumberFormatException e) { -// // Wasn't a number, so it can't match what we were expecting -// continue; -// } -// -// // We found a group of entries in the config which are (similar to) what -// // we would have set. -// return false; -// } -// -// // We didn't find any entries that seemed to match, write the config -// return true; -// } -// -// private Map<Integer,Map<String,String>> permuteConfigEntries(Map<String,String> entries) { -// Map<Integer,Map<String,String>> groupedEntries = new HashMap<Integer,Map<String,String>>(); -// for (Entry<String,String> entry : entries.entrySet()) { -// final String key = entry.getKey(), value = entry.getValue(); -// -// if (key.endsWith(SequencedFormatHelper.CONFIGURED_SEQUENCES)) { -// continue; -// } -// -// int index = key.lastIndexOf(PERIOD); -// if (-1 != index) { -// int group = Integer.parseInt(key.substring(index + 1)); -// String name = key.substring(0, index); -// -// Map<String,String> entriesInGroup = groupedEntries.get(group); -// if (null == entriesInGroup) { -// entriesInGroup = new HashMap<String,String>(); -// groupedEntries.put(group, entriesInGroup); -// } -// -// entriesInGroup.put(name, value); -// } else { -// LOG.warn("Could not parse key: " + key); -// } -// } -// -// return groupedEntries; -// } - + @SuppressWarnings("rawtypes") public OutputFormat getOutputFormat() { return new AccumuloOutputFormat(); } - + @SuppressWarnings({"rawtypes", "unchecked"}) public void prepareToWrite(RecordWriter writer) { this.writer = writer; } - + public abstract Collection<Mutation> getMutations(Tuple tuple) throws ExecException, IOException; - + public void putNext(Tuple tuple) throws ExecException, IOException { Collection<Mutation> muts = getMutations(tuple); for (Mutation mut : muts) { @@ -557,11 +327,11 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF } } } - + public void cleanupOnFailure(String failure, Job job) {} - + public void cleanupOnSuccess(String location, Job job) {} - + @Override public void checkSchema(ResourceSchema s) throws IOException { if (!(caster instanceof LoadStoreCaster)) { @@ -571,40 +341,40 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF schema = s; getUDFProperties().setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema)); } - + protected Text tupleToText(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException { Object o = tuple.get(i); byte type = schemaToType(o, i, fieldSchemas); - + return objToText(o, type); } - + protected Text objectToText(Object o, ResourceFieldSchema fieldSchema) throws IOException { byte type = schemaToType(o, fieldSchema); - + return objToText(o, type); } - + protected byte schemaToType(Object o, ResourceFieldSchema fieldSchema) { return (fieldSchema == null) ? DataType.findType(o) : fieldSchema.getType(); } - + protected byte schemaToType(Object o, int i, ResourceFieldSchema[] fieldSchemas) { return (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType(); } - + protected byte[] tupleToBytes(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException { Object o = tuple.get(i); byte type = schemaToType(o, i, fieldSchemas); - + return objToBytes(o, type); - + } - + protected long objToLong(Tuple tuple, int i, ResourceFieldSchema[] fieldSchemas) throws IOException { Object o = tuple.get(i); byte type = schemaToType(o, i, fieldSchemas); - + switch (type) { case DataType.LONG: return (Long) o; @@ -629,13 +399,13 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF case DataType.BIGINTEGER: BigInteger bigintTimestamp = (BigInteger) o; long longTimestamp = bigintTimestamp.longValue(); - + BigInteger recreatedTimestamp = BigInteger.valueOf(longTimestamp); - + if (!recreatedTimestamp.equals(bigintTimestamp)) { LOG.warn("Downcasting BigInteger into Long results in a change of the original value. Was " + bigintTimestamp + " but is now " + longTimestamp); } - + return longTimestamp; case DataType.BIGDECIMAL: BigDecimal bigdecimalTimestamp = (BigDecimal) o; @@ -658,21 +428,21 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF default: LOG.error("Could not convert " + o + " of class " + o.getClass() + " into long."); throw new IOException("Could not convert " + o.getClass() + " into long"); - + } } - + protected Text objToText(Object o, byte type) throws IOException { byte[] bytes = objToBytes(o, type); - + if (null == bytes) { LOG.warn("Creating empty text from null value"); return new Text(); } - + return new Text(bytes); } - + @SuppressWarnings("unchecked") protected byte[] objToBytes(Object o, byte type) throws IOException { if (o == null) @@ -700,12 +470,12 @@ public abstract class AbstractAccumuloStorage extends LoadFunc implements StoreF return caster.toBytes((Boolean) o); case DataType.DATETIME: return caster.toBytes((DateTime) o); - + // The type conversion here is unchecked. // Relying on DataType.findType to do the right thing. case DataType.MAP: return caster.toBytes((Map<String,Object>) o); - + case DataType.NULL: return null; case DataType.TUPLE: