http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHTable.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHTable.java
 
b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHTable.java
new file mode 100644
index 0000000..e75b533
--- /dev/null
+++ 
b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHTable.java
@@ -0,0 +1,701 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase.mock;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+
+/**
+ * MockHTable.
+ *
+ * This implementation is a selected excerpt from 
https://gist.github.com/agaoglu/613217
+ */
+public class MockHTable implements HTableInterface {
+
+
+  private final String tableName;
+  private final List<String> columnFamilies = new ArrayList<>();
+  private HColumnDescriptor[] descriptors;
+  private final List<Put> putLog;
+  private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], 
NavigableMap<Long, byte[]>>>> data
+          = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+  private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], 
NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
+    return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
+  }
+
+  private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], 
NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, 
long timestampEnd, int maxVersions) {
+    List<KeyValue> ret = new ArrayList<KeyValue>();
+    for (byte[] family : rowdata.keySet())
+      for (byte[] qualifier : rowdata.get(family).keySet()) {
+        int versionsAdded = 0;
+        for (Map.Entry<Long, byte[]> tsToVal : 
rowdata.get(family).get(qualifier).descendingMap().entrySet()) {
+          if (versionsAdded++ == maxVersions)
+            break;
+          Long timestamp = tsToVal.getKey();
+          if (timestamp < timestampStart)
+            continue;
+          if (timestamp > timestampEnd)
+            continue;
+          byte[] value = tsToVal.getValue();
+          ret.add(new KeyValue(row, family, qualifier, timestamp, value));
+        }
+      }
+    return ret;
+  }
+  public MockHTable(String tableName) {
+    this.tableName = tableName;
+    this.putLog = new ArrayList<>();
+  }
+
+  public MockHTable(String tableName, String... columnFamilies) {
+    this(tableName);
+    for(String cf : columnFamilies) {
+      addColumnFamily(cf);
+    }
+  }
+
+  public int size() {
+    return data.size();
+  }
+
+  public void addColumnFamily(String columnFamily) {
+    this.columnFamilies.add(columnFamily);
+    descriptors = new HColumnDescriptor[columnFamilies.size()];
+    int i = 0;
+    for(String cf : columnFamilies) {
+      descriptors[i++] = new HColumnDescriptor(cf);
+    }
+  }
+
+  @Override
+  public byte[] getTableName() {
+    return Bytes.toBytes(tableName);
+  }
+
+  @Override
+  public TableName getName() {
+    return TableName.valueOf(tableName);
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return HBaseConfiguration.create();
+  }
+
+  @Override
+  public HTableDescriptor getTableDescriptor() throws IOException {
+    HTableDescriptor ret = new HTableDescriptor(tableName);
+    for(HColumnDescriptor c : descriptors) {
+      ret.addFamily(c);
+    }
+    return ret;
+  }
+
+  @Override
+  public boolean exists(Get get) throws IOException {
+    if(get.getFamilyMap() == null || get.getFamilyMap().size() == 0) {
+      return data.containsKey(get.getRow());
+    } else {
+      byte[] row = get.getRow();
+      if(!data.containsKey(row)) {
+        return false;
+      }
+      for(byte[] family : get.getFamilyMap().keySet()) {
+        if(!data.get(row).containsKey(family)) {
+          return false;
+        } else {
+          return true;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Test for the existence of columns in the table, as specified by the Gets.
+   *
+   * <p>This will return an array of booleans. Each value will be true if the 
related Get matches
+   * one or more keys, false if not.
+   *
+   * <p>This is a server-side call so it prevents any data from being 
transferred to
+   * the client.
+   *
+   * @param gets the Gets
+   * @return Array of boolean.  True if the specified Get matches one or more 
keys, false if not.
+   * @throws IOException e
+   */
+  @Override
+  public boolean[] existsAll(List<Get> gets) throws IOException {
+    boolean[] ret = new boolean[gets.size()];
+    int i = 0;
+    for(boolean b : exists(gets)) {
+      ret[i++] = b;
+    }
+    return ret;
+  }
+
+  @Override
+  public Boolean[] exists(List<Get> list) throws IOException {
+    Boolean[] ret = new Boolean[list.size()];
+    int i = 0;
+    for(Get g : list) {
+      ret[i++] = exists(g);
+    }
+    return ret;
+  }
+
+  @Override
+  public void batch(List<? extends Row> list, Object[] objects) throws 
IOException, InterruptedException {
+    Object[] results = batch(list);
+    System.arraycopy(results, 0, objects, 0, results.length);
+  }
+
+  /**
+   * @param actions
+   * @deprecated
+   */
+  @Deprecated
+  @Override
+  public Object[] batch(List<? extends Row> actions) throws IOException, 
InterruptedException {
+    List<Result> results = new ArrayList<Result>();
+    for (Row r : actions) {
+      if (r instanceof Delete) {
+        delete((Delete) r);
+        continue;
+      }
+      if (r instanceof Put) {
+        put((Put) r);
+        continue;
+      }
+      if (r instanceof Get) {
+        results.add(get((Get) r));
+      }
+    }
+    return results.toArray();
+  }
+
+  @Override
+  public <R> void batchCallback(List<? extends Row> list, Object[] objects, 
Batch.Callback<R> callback) throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+
+  }
+
+  /**
+   * @param list
+   * @param callback
+   * @deprecated
+   */
+  @Deprecated
+  @Override
+  public <R> Object[] batchCallback(List<? extends Row> list, 
Batch.Callback<R> callback) throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Result get(Get get) throws IOException {
+    if (!data.containsKey(get.getRow()))
+      return new Result();
+    byte[] row = get.getRow();
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    if (!get.hasFamilies()) {
+      kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
+    } else {
+      for (byte[] family : get.getFamilyMap().keySet()){
+        if (data.get(row).get(family) == null)
+          continue;
+        NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family);
+        if (qualifiers == null || qualifiers.isEmpty())
+          qualifiers = data.get(row).get(family).navigableKeySet();
+        for (byte[] qualifier : qualifiers){
+          if (qualifier == null)
+            qualifier = "".getBytes();
+          if (!data.get(row).containsKey(family) ||
+                  !data.get(row).get(family).containsKey(qualifier) ||
+                  data.get(row).get(family).get(qualifier).isEmpty())
+            continue;
+          Map.Entry<Long, byte[]> timestampAndValue = 
data.get(row).get(family).get(qualifier).lastEntry();
+          kvs.add(new KeyValue(row,family, qualifier, 
timestampAndValue.getKey(), timestampAndValue.getValue()));
+        }
+      }
+    }
+    Filter filter = get.getFilter();
+    if (filter != null) {
+      filter.reset();
+      List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
+      for (KeyValue kv : kvs) {
+        if (filter.filterAllRemaining()) {
+          break;
+        }
+        if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), 
kv.getRowLength())) {
+          continue;
+        }
+        if (filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE) {
+          nkvs.add(kv);
+        }
+        // ignoring next key hint which is a optimization to reduce file 
system IO
+      }
+      if (filter.hasFilterRow()) {
+        filter.filterRow();
+      }
+      kvs = nkvs;
+    }
+
+    return new Result(kvs);
+  }
+
+  @Override
+  public Result[] get(List<Get> list) throws IOException {
+    Result[] ret = new Result[list.size()];
+    int i = 0;
+    for(Get g : list) {
+      ret[i++] = get(g);
+    }
+    return ret;
+  }
+
+  /**
+   * @param bytes
+   * @param bytes1
+   * @deprecated
+   */
+  @Deprecated
+  @Override
+  public Result getRowOrBefore(byte[] bytes, byte[] bytes1) throws IOException 
{
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ResultScanner getScanner(Scan scan) throws IOException {
+    final List<Result> ret = new ArrayList<Result>();
+    byte[] st = scan.getStartRow();
+    byte[] sp = scan.getStopRow();
+    Filter filter = scan.getFilter();
+
+    for (byte[] row : data.keySet()){
+      // if row is equal to startRow emit it. When startRow (inclusive) and
+      // stopRow (exclusive) is the same, it should not be excluded which would
+      // happen w/o this control.
+      if (st != null && st.length > 0 &&
+              Bytes.BYTES_COMPARATOR.compare(st, row) != 0) {
+        // if row is before startRow do not emit, pass to next row
+        if (st != null && st.length > 0 &&
+                Bytes.BYTES_COMPARATOR.compare(st, row) > 0)
+          continue;
+        // if row is equal to stopRow or after it do not emit, stop iteration
+        if (sp != null && sp.length > 0 &&
+                Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0)
+          break;
+      }
+
+      List<KeyValue> kvs = null;
+      if (!scan.hasFamilies()) {
+        kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), 
scan.getTimeRange().getMax(), scan.getMaxVersions());
+      } else {
+        kvs = new ArrayList<KeyValue>();
+        for (byte[] family : scan.getFamilyMap().keySet()){
+          if (data.get(row).get(family) == null)
+            continue;
+          NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
+          if (qualifiers == null || qualifiers.isEmpty())
+            qualifiers = data.get(row).get(family).navigableKeySet();
+          for (byte[] qualifier : qualifiers){
+            if (data.get(row).get(family).get(qualifier) == null)
+              continue;
+            for (Long timestamp : 
data.get(row).get(family).get(qualifier).descendingKeySet()){
+              if (timestamp < scan.getTimeRange().getMin())
+                continue;
+              if (timestamp > scan.getTimeRange().getMax())
+                continue;
+              byte[] value = 
data.get(row).get(family).get(qualifier).get(timestamp);
+              kvs.add(new KeyValue(row, family, qualifier, timestamp, value));
+              if(kvs.size() == scan.getMaxVersions()) {
+                break;
+              }
+            }
+          }
+        }
+      }
+      if (filter != null) {
+        filter.reset();
+        List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
+        for (KeyValue kv : kvs) {
+          if (filter.filterAllRemaining()) {
+            break;
+          }
+          if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), 
kv.getRowLength())) {
+            continue;
+          }
+          Filter.ReturnCode filterResult = filter.filterKeyValue(kv);
+          if (filterResult == Filter.ReturnCode.INCLUDE) {
+            nkvs.add(kv);
+          } else if (filterResult == Filter.ReturnCode.NEXT_ROW) {
+            break;
+          }
+          // ignoring next key hint which is a optimization to reduce file 
system IO
+        }
+        if (filter.hasFilterRow()) {
+          filter.filterRow();
+        }
+        kvs = nkvs;
+      }
+      if (!kvs.isEmpty()) {
+        ret.add(new Result(kvs));
+      }
+    }
+
+    return new ResultScanner() {
+      private final Iterator<Result> iterator = ret.iterator();
+      @Override
+      public Iterator<Result> iterator() {
+        return iterator;
+      }
+      @Override
+      public Result[] next(int nbRows) throws IOException {
+        ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
+        for(int i = 0; i < nbRows; i++) {
+          Result next = next();
+          if (next != null) {
+            resultSets.add(next);
+          } else {
+            break;
+          }
+        }
+        return resultSets.toArray(new Result[resultSets.size()]);
+      }
+      @Override
+      public Result next() throws IOException {
+        try {
+          return iterator().next();
+        } catch (NoSuchElementException e) {
+          return null;
+        }
+      }
+      @Override
+      public void close() {}
+    };
+  }
+  @Override
+  public ResultScanner getScanner(byte[] family) throws IOException {
+    Scan scan = new Scan();
+    scan.addFamily(family);
+    return getScanner(scan);
+  }
+
+  @Override
+  public ResultScanner getScanner(byte[] family, byte[] qualifier)
+          throws IOException {
+    Scan scan = new Scan();
+    scan.addColumn(family, qualifier);
+    return getScanner(scan);
+  }
+
+  public List<Put> getPutLog() {
+    synchronized (putLog) {
+      return ImmutableList.copyOf(putLog);
+    }
+  }
+
+  public void addToPutLog(Put put) {
+    synchronized(putLog) {
+      putLog.add(put);
+    }
+  }
+
+  public void clear() {
+    synchronized (putLog) {
+      putLog.clear();
+    }
+    data.clear();
+  }
+
+  @Override
+  public void put(Put put) throws IOException {
+    addToPutLog(put);
+
+    byte[] row = put.getRow();
+    NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> 
rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], 
NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
+    for (byte[] family : put.getFamilyMap().keySet()){
+      NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = 
forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, 
byte[]>>(Bytes.BYTES_COMPARATOR));
+      for (KeyValue kv : put.getFamilyMap().get(family)){
+        kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
+        byte[] qualifier = kv.getQualifier();
+        NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, 
qualifier, new TreeMap<Long, byte[]>());
+        qualifierData.put(kv.getTimestamp(), kv.getValue());
+      }
+    }
+  }
+
+  /**
+   * Helper method to find a key in a map. If key is not found, newObject is
+   * added to map and returned
+   *
+   * @param map
+   *          map to extract value from
+   * @param key
+   *          key to look for
+   * @param newObject
+   *          set key to this if not found
+   * @return found value or newObject if not found
+   */
+  private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject){
+    V data = map.get(key);
+    if (data == null){
+      data = newObject;
+      map.put(key, data);
+    }
+    return data;
+  }
+
+  @Override
+  public void put(List<Put> puts) throws IOException {
+    for (Put put : puts)
+      put(put);
+  }
+
+  @Override
+  public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, 
byte[] bytes3, Put put) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected
+   * value. If it does, it adds the put.  If the passed value is null, the 
check
+   * is for the lack of column (ie: non-existance)
+   *
+   * @param row       to check
+   * @param family    column family to check
+   * @param qualifier column qualifier to check
+   * @param compareOp comparison operator to use
+   * @param value     the expected value
+   * @param put       data to put if check succeeds
+   * @return true if the new put was executed, false otherwise
+   * @throws IOException e
+   */
+  @Override
+  public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, 
CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void delete(Delete delete) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void delete(List<Delete> list) throws IOException {
+    throw new UnsupportedOperationException();
+
+  }
+
+  @Override
+  public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, 
byte[] bytes3, Delete delete) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected
+   * value. If it does, it adds the delete.  If the passed value is null, the
+   * check is for the lack of column (ie: non-existance)
+   *
+   * @param row       to check
+   * @param family    column family to check
+   * @param qualifier column qualifier to check
+   * @param compareOp comparison operator to use
+   * @param value     the expected value
+   * @param delete    data to delete if check succeeds
+   * @return true if the new delete was executed, false otherwise
+   * @throws IOException e
+   */
+  @Override
+  public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, 
CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws 
IOException {
+    return false;
+  }
+
+  @Override
+  public void mutateRow(RowMutations rowMutations) throws IOException {
+    throw new UnsupportedOperationException();
+
+  }
+
+  @Override
+  public Result append(Append append) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Result increment(Increment increment) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, 
long l) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, 
long l, Durability durability) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * @param bytes
+   * @param bytes1
+   * @param bytes2
+   * @param l
+   * @param b
+   * @deprecated
+   */
+  @Deprecated
+  @Override
+  public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, 
long l, boolean b) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isAutoFlush() {
+    return autoflush;
+  }
+
+  @Override
+  public void flushCommits() throws IOException {
+
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public CoprocessorRpcChannel coprocessorService(byte[] bytes) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> 
aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call) throws 
ServiceException, Throwable {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T extends Service, R> void coprocessorService(Class<T> aClass, 
byte[] bytes, byte[] bytes1, Batch.Call<T, R> call, Batch.Callback<R> callback) 
throws ServiceException, Throwable {
+    throw new UnsupportedOperationException();
+  }
+
+  boolean autoflush = true;
+
+  /**
+   * @param b
+   * @deprecated
+   */
+  @Deprecated
+  @Override
+  public void setAutoFlush(boolean b) {
+    autoflush = b;
+  }
+
+  @Override
+  public void setAutoFlush(boolean b, boolean b1) {
+    autoflush = b;
+  }
+
+  @Override
+  public void setAutoFlushTo(boolean b) {
+    autoflush = b;
+  }
+
+  long writeBufferSize = 0;
+  @Override
+  public long getWriteBufferSize() {
+    return writeBufferSize;
+  }
+
+  @Override
+  public void setWriteBufferSize(long l) throws IOException {
+    writeBufferSize = l;
+  }
+
+  @Override
+  public <R extends Message> Map<byte[], R> 
batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message 
message, byte[] bytes, byte[] bytes1, R r) throws ServiceException, Throwable {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <R extends Message> void 
batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message 
message, byte[] bytes, byte[] bytes1, R r, Batch.Callback<R> callback) throws 
ServiceException, Throwable {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected 
value.
+   * If it does, it performs the row mutations.  If the passed value is null, 
the check
+   * is for the lack of column (ie: non-existence)
+   *
+   * @param row       to check
+   * @param family    column family to check
+   * @param qualifier column qualifier to check
+   * @param compareOp the comparison operator
+   * @param value     the expected value
+   * @param mutation  mutations to perform if check succeeds
+   * @return true if the new put was executed, false otherwise
+   * @throws IOException e
+   */
+  @Override
+  public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, 
CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws 
IOException {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/README.md 
b/metron-platform/metron-indexing/README.md
index 0cf3a66..2095d0f 100644
--- a/metron-platform/metron-indexing/README.md
+++ b/metron-platform/metron-indexing/README.md
@@ -111,6 +111,30 @@ Storm console.  e.g.:
 * hdfs writer
   * disabled
 
+# Updates to Indexed Data
+
+There are clear usecases where we would want to incorporate the capability to 
update indexed data.
+Thus far, we have limited capabilities provided to support this use-case:
+* Updates to the random access index (e.g. Elasticsearch and Solr) should be 
supported
+* Updates to the cold storage index (e.g. HDFS) is not supported currently, 
however to support the batch
+use-case updated documents will be provided in a NoSQL write-ahead log (e.g. a 
HBase table) and an Java API
+will be provided to retrieve those updates scalably (i.e. a scan-free 
architecture).
+
+Put simply, the random access index will be always up-to-date, but the HDFS 
index will need to be
+joined to the NoSQL write-ahead log to get current updates.
+
+## The `IndexDao` Abstraction
+
+The indices mentioned above as part of Update should be pluggable by the 
developer so that
+new write-ahead logs or real-time indices can be supported by providing an 
implementation supporting
+the data access patterns.
+
+To support a new index, one would need to implement the 
`org.apache.metron.indexing.dao.IndexDao` abstraction
+and provide update and search capabilities.  IndexDaos may be composed and 
updates will be performed
+in parallel.  This enables a flexible strategy for specifying your backing 
store for updates at runtime.
+For instance, currently the REST API supports the update functionality and may 
be configured with a list of
+IndexDao implementations to use to support the updates.
+
 # Notes on Performance Tuning
 
 Default installed Metron is untuned for production deployment.  By far

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/pom.xml 
b/metron-platform/metron-indexing/pom.xml
index b0127bb..c64c374 100644
--- a/metron-platform/metron-indexing/pom.xml
+++ b/metron-platform/metron-indexing/pom.xml
@@ -37,6 +37,32 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-hbase</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-hdfs</artifactId>
             <version>${global_hadoop_version}</version>
@@ -47,7 +73,27 @@
                 </exclusion>
             </exclusions>
             <scope>provided</scope>
+        </dependency>        
+        <dependency>
+            <groupId>com.flipkart.zjsonpatch</groupId>
+            <artifactId>zjsonpatch</artifactId>
+            <version>0.3.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
+
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
index dd68484..ddb88e5 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
@@ -17,13 +17,35 @@
  */
 package org.apache.metron.indexing.dao;
 
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Supplier;
 
 public class AccessConfig {
   private Integer maxSearchResults;
+  private Supplier<Map<String, Object>> globalConfigSupplier;
   private Map<String, String> optionalSettings = new HashMap<>();
+  private TableProvider tableProvider = null;
+
+  /**
+   * A supplier which will return the current global config.
+   * @return
+   */
+  public Supplier<Map<String, Object>> getGlobalConfigSupplier() {
+    return globalConfigSupplier;
+  }
+
+  public void setGlobalConfigSupplier(Supplier<Map<String, Object>> 
globalConfigSupplier) {
+    this.globalConfigSupplier = globalConfigSupplier;
+  }
 
+  /**
+   * The maximum search result.
+   * @return
+   */
   public Integer getMaxSearchResults() {
     return maxSearchResults;
   }
@@ -32,6 +54,10 @@ public class AccessConfig {
     this.maxSearchResults = maxSearchResults;
   }
 
+  /**
+   * Get optional settings for initializing indices.
+   * @return
+   */
   public Map<String, String> getOptionalSettings() {
     return optionalSettings;
   }
@@ -39,4 +65,17 @@ public class AccessConfig {
   public void setOptionalSettings(Map<String, String> optionalSettings) {
     this.optionalSettings = optionalSettings;
   }
+
+  /**
+   * Return the table provider to use for NoSql DAOs
+   * @return
+   */
+  public TableProvider getTableProvider() {
+    return tableProvider;
+  }
+
+  public void setTableProvider(TableProvider tableProvider) {
+    this.tableProvider = tableProvider;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
new file mode 100644
index 0000000..a1cf398
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.indexing.dao;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.update.Document;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+
+/**
+ * The HBaseDao is an index dao which only supports the following actions:
+ * * Update
+ * * Get document
+ *
+ * The mechanism here is that updates to documents will be added to a HBase 
Table as a write-ahead log.
+ * The Key for a row supporting a given document will be the GUID, which 
should be sufficiently distributed.
+ * Every new update will have a column added (column qualifier will be the 
timestamp of the update).
+ * Upon retrieval, the most recent column will be returned.
+ *
+ */
+public class HBaseDao implements IndexDao {
+  public static String HBASE_TABLE = "update.hbase.table";
+  public static String HBASE_CF = "update.hbase.cf";
+  private HTableInterface tableInterface;
+  private byte[] cf;
+  private AccessConfig config;
+  public HBaseDao() {
+
+  }
+
+  @Override
+  public synchronized SearchResponse search(SearchRequest searchRequest) 
throws InvalidSearchException {
+    return null;
+  }
+
+  @Override
+  public synchronized void init(AccessConfig config) {
+    if(this.tableInterface == null) {
+      this.config = config;
+      Map<String, Object> globalConfig = 
config.getGlobalConfigSupplier().get();
+      if(globalConfig == null) {
+        throw new IllegalStateException("Cannot find the global config.");
+      }
+      String table = (String)globalConfig.get(HBASE_TABLE);
+      String cf = (String) 
config.getGlobalConfigSupplier().get().get(HBASE_CF);
+      if(table == null || cf == null) {
+        throw new IllegalStateException("You must configure " + HBASE_TABLE + 
" and " + HBASE_CF + " in the global config.");
+      }
+      try {
+        tableInterface = 
config.getTableProvider().getTable(HBaseConfiguration.create(), table);
+        this.cf = cf.getBytes();
+      } catch (IOException e) {
+        throw new IllegalStateException("Unable to initialize HBaseDao: " + 
e.getMessage(), e);
+      }
+    }
+  }
+
+  public HTableInterface getTableInterface() {
+    if(tableInterface == null) {
+      init(config);
+    }
+    return tableInterface;
+  }
+
+  @Override
+  public synchronized Document getLatest(String guid, String sensorType) 
throws IOException {
+    Get get = new Get(guid.getBytes());
+    get.addFamily(cf);
+    Result result = getTableInterface().get(get);
+    NavigableMap<byte[], byte[]> columns = result.getFamilyMap( cf);
+    if(columns == null || columns.size() == 0) {
+      return null;
+    }
+    Map.Entry<byte[], byte[]> entry= columns.lastEntry();
+    Long ts = Bytes.toLong(entry.getKey());
+    if(entry.getValue()!= null) {
+      String json = new String(entry.getValue());
+      return new Document(json, guid, sensorType, ts);
+    }
+    else {
+      return null;
+    }
+  }
+
+  @Override
+  public synchronized void update(Document update, Optional<String> index) 
throws IOException {
+    Put put = new Put(update.getGuid().getBytes());
+    long ts = update.getTimestamp() == 
null?System.currentTimeMillis():update.getTimestamp();
+    byte[] columnQualifier = Bytes.toBytes(ts);
+    byte[] doc = JSONUtils.INSTANCE.toJSON(update.getDocument());
+    put.addColumn(cf, columnQualifier, doc);
+    getTableInterface().put(put);
+  }
+
+  @Override
+  public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> 
indices) throws IOException {
+    return null;
+  }
+
+  @Override
+  public Map<String, FieldType> getCommonColumnMetadata(List<String> indices) 
throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
index 31fe74e..350e402 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
@@ -17,18 +17,129 @@
  */
 package org.apache.metron.indexing.dao;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.flipkart.zjsonpatch.JsonPatch;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.update.Document;
+import org.apache.metron.indexing.dao.update.PatchRequest;
+import org.apache.metron.indexing.dao.update.ReplaceRequest;
+import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
+
+import java.io.IOException;
 import org.apache.metron.indexing.dao.search.FieldType;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public interface IndexDao {
+
+  /**
+   * Return search response based on the search request
+   *
+   * @param searchRequest
+   * @return
+   * @throws InvalidSearchException
+   */
   SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException;
-  void init(Map<String, Object> globalConfig, AccessConfig config);
+
+  /**
+   * Initialize the DAO with the AccessConfig object.
+   * @param config
+   */
+  void init(AccessConfig config);
+
+  /**
+   * Return the latest version of a document given the GUID and the sensor 
type.
+   *
+   * @param guid The GUID for the document
+   * @param sensorType The sensor type of the document
+   * @return The Document matching or null if not available.
+   * @throws IOException
+   */
+  Document getLatest(String guid, String sensorType) throws IOException;
+
+  /**
+   * Return the latest version of a document given a GetRequest.
+   * @param request The GetRequest which indicates the GUID and sensor type.
+   * @return Optionally the document (dependent upon existence in the index).
+   * @throws IOException
+   */
+  default Optional<Map<String, Object>> getLatestResult(GetRequest request) 
throws IOException {
+    Document ret = getLatest(request.getGuid(), request.getSensorType());
+    if(ret == null) {
+      return Optional.empty();
+    }
+    else {
+      return Optional.ofNullable(ret.getDocument());
+    }
+  }
+
+  /**
+   * Update given a Document and optionally the index where the document 
exists.
+   *
+   * @param update The document to replace from the index.
+   * @param index The index where the document lives.
+   * @throws IOException
+   */
+  void update(Document update, Optional<String> index) throws IOException;
+
+
+  /**
+   * Update a document in an index given a JSON Patch (see RFC 6902 at 
https://tools.ietf.org/html/rfc6902)
+   * @param request The patch request
+   * @param timestamp Optionally a timestamp to set. If not specified then 
current time is used.
+   * @throws OriginalNotFoundException If the original is not found, then it 
cannot be patched.
+   * @throws IOException
+   */
+  default void patch( PatchRequest request
+                    , Optional<Long> timestamp
+                    ) throws OriginalNotFoundException, IOException {
+    Map<String, Object> latest = request.getSource();
+    if(latest == null) {
+      Document latestDoc = getLatest(request.getGuid(), 
request.getSensorType());
+      if(latestDoc.getDocument() != null) {
+        latest = latestDoc.getDocument();
+      }
+      else {
+        throw new OriginalNotFoundException("Unable to patch an document that 
doesn't exist and isn't specified.");
+      }
+    }
+    JsonNode originalNode = JSONUtils.INSTANCE.convert(latest, JsonNode.class);
+    JsonNode patched = JsonPatch.apply(request.getPatch(), originalNode);
+    Map<String, Object> updated = JSONUtils.INSTANCE.getMapper()
+                                           .convertValue(patched, new 
TypeReference<Map<String, Object>>() {});
+    Document d = new Document( updated
+                             , request.getGuid()
+                             , request.getSensorType()
+                             , timestamp.orElse(System.currentTimeMillis())
+                             );
+    update(d, Optional.ofNullable(request.getIndex()));
+  }
+
+  /**
+   * Replace a document in an index.
+   * @param request The replacement request.
+   * @param timestamp The timestamp (optional) of the update.  If not 
specified, then current time will be used.
+   * @throws IOException
+   */
+  default void replace( ReplaceRequest request
+                      , Optional<Long> timestamp
+                      ) throws IOException {
+    Document d = new Document(request.getReplacement()
+                             , request.getGuid()
+                             , request.getSensorType()
+                             , timestamp.orElse(System.currentTimeMillis())
+                             );
+    update(d, Optional.ofNullable(request.getIndex()));
+  }
+
   Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) 
throws IOException;
   Map<String, FieldType> getCommonColumnMetadata(List<String> indices) throws 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java
index 9c2de0e..e8df0b7 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java
@@ -17,14 +17,42 @@
  */
 package org.apache.metron.indexing.dao;
 
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
 import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 public class IndexDaoFactory {
-  public static IndexDao create(String daoImpl, Map<String, Object> 
globalConfig, AccessConfig config) throws ClassNotFoundException, 
NoSuchMethodException, IllegalAccessException, InvocationTargetException, 
InstantiationException {
-    Class<? extends IndexDao> clazz = (Class<? extends IndexDao>) 
Class.forName(daoImpl);
-    IndexDao instance = clazz.getConstructor().newInstance();
-    instance.init(globalConfig, config);
-    return instance;
+  public static List<IndexDao> create( String daoImpls
+                                     , AccessConfig config
+                                     ) throws ClassNotFoundException, 
NoSuchMethodException, IllegalAccessException, InvocationTargetException, 
InstantiationException
+  {
+    List<IndexDao> ret = new ArrayList<>();
+    for(String daoImpl : Splitter.on(",").split(daoImpls)) {
+      Class<? extends IndexDao> clazz = (Class<? extends IndexDao>) 
Class.forName(daoImpl);
+      IndexDao instance = clazz.getConstructor().newInstance();
+      instance.init(config);
+      ret.add(instance);
+    }
+    return ret;
+  }
+
+  public static IndexDao combine(Iterable<IndexDao> daos) throws 
ClassNotFoundException, NoSuchMethodException, IllegalAccessException, 
InvocationTargetException, InstantiationException {
+    return combine(daos, x -> x);
+  }
+
+  public static IndexDao combine(Iterable<IndexDao> daos, Function<IndexDao, 
IndexDao> daoTransformation) throws ClassNotFoundException, 
NoSuchMethodException, IllegalAccessException, InvocationTargetException, 
InstantiationException {
+    int numDaos =  Iterables.size(daos);
+    if(numDaos == 0) {
+      throw new IllegalArgumentException("Trying to combine 0 dao's into a DAO 
is not a supported configuration.");
+    }
+    if( numDaos == 1) {
+      return daoTransformation.apply(Iterables.getFirst(daos, null));
+    }
+    return new MultiIndexDao(daos, daoTransformation);
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexUpdateCallback.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexUpdateCallback.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexUpdateCallback.java
new file mode 100644
index 0000000..4cb4d7e
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexUpdateCallback.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.indexing.dao;
+
+import org.apache.metron.indexing.dao.update.Document;
+
+import java.io.IOException;
+
+public interface IndexUpdateCallback {
+  void postUpdate(IndexDao dao, Document doc) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
new file mode 100644
index 0000000..e9a4a9a
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.indexing.dao;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.update.Document;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class MultiIndexDao implements IndexDao {
+  private List<IndexDao> indices;
+
+  public MultiIndexDao( IndexDao... composedDao) {
+    indices = new ArrayList<>();
+    Collections.addAll(indices, composedDao);
+  }
+
+  public MultiIndexDao(Iterable<IndexDao> composedDao) {
+    this.indices = new ArrayList<>();
+    Iterables.addAll(indices, composedDao);
+  }
+
+  public MultiIndexDao(Iterable<IndexDao> composedDao, Function<IndexDao, 
IndexDao> decoratorTransformation) {
+    this(Iterables.transform(composedDao, x -> 
decoratorTransformation.apply(x)));
+  }
+
+  @Override
+  public void update(final Document update, Optional<String> index) throws 
IOException {
+    List<String> exceptions =
+    indices.parallelStream().map(dao -> {
+      try {
+        dao.update(update, index);
+        return null;
+      } catch (Throwable e) {
+        return dao.getClass() + ": " + e.getMessage() + "\n" + 
ExceptionUtils.getStackTrace(e);
+      }
+    }).filter(e -> e != null).collect(Collectors.toList());
+    if(exceptions.size() > 0) {
+      throw new IOException(Joiner.on("\n").join(exceptions));
+    }
+  }
+
+  @Override
+  public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> 
in) throws IOException {
+    for(IndexDao dao : indices) {
+      Map<String, Map<String, FieldType>> r = dao.getColumnMetadata(in);
+      if(r != null) {
+        return r;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Map<String, FieldType> getCommonColumnMetadata(List<String> in) 
throws IOException {
+    for(IndexDao dao : indices) {
+      Map<String, FieldType> r = dao.getCommonColumnMetadata(in);
+      if(r != null) {
+        return r;
+      }
+    }
+    return null;
+  }
+
+  private static class DocumentContainer {
+    private Optional<Document> d = Optional.empty();
+    private Optional<Throwable> t = Optional.empty();
+    public DocumentContainer(Document d) {
+      this.d = Optional.ofNullable(d);
+    }
+    public DocumentContainer(Throwable t) {
+      this.t = Optional.ofNullable(t);
+    }
+
+    public Optional<Document> getDocument() {
+      return d;
+    }
+    public Optional<Throwable> getException() {
+      return t;
+    }
+
+  }
+
+  @Override
+  public SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException {
+    for(IndexDao dao : indices) {
+      SearchResponse s = dao.search(searchRequest);
+      if(s != null) {
+        return s;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void init(AccessConfig config) {
+    for(IndexDao dao : indices) {
+      dao.init(config);
+    }
+  }
+
+  @Override
+  public Document getLatest(final String guid, String sensorType) throws 
IOException {
+    Document ret = null;
+    List<DocumentContainer> output =
+            indices.parallelStream().map(dao -> {
+      try {
+        return new DocumentContainer(dao.getLatest(guid, sensorType));
+      } catch (Throwable e) {
+        return new DocumentContainer(e);
+      }
+    }).collect(Collectors.toList());
+
+    List<String> error = new ArrayList<>();
+    for(DocumentContainer dc : output) {
+      if(dc.getException().isPresent()) {
+        Throwable e = dc.getException().get();
+        error.add(e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e));
+      }
+      else {
+        if(dc.getDocument().isPresent()) {
+          Document d = dc.getDocument().get();
+          if(ret == null || ret.getTimestamp() < d.getTimestamp()) {
+            ret = d;
+          }
+        }
+      }
+    }
+    if(error.size() > 0) {
+      throw new IOException(Joiner.on("\n").join(error));
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java
new file mode 100644
index 0000000..eb255dc
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+public class GetRequest {
+  String guid;
+  String sensorType;
+
+  /**
+   * The GUID of the document
+   * @return
+   */
+  public String getGuid() {
+    return guid;
+  }
+
+  public void setGuid(String guid) {
+    this.guid = guid;
+  }
+
+  /**
+   * The sensor type of the indices that you're searching.
+   * @return
+   */
+  public String getSensorType() {
+    return sensorType;
+  }
+
+  public void setSensorType(String sensorType) {
+    this.sensorType = sensorType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java
index b92b36d..897f918 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java
@@ -39,6 +39,10 @@ public class SearchRequest {
     facetFields = new ArrayList<>();
   }
 
+  /**
+   * The list of indices to search.
+   * @return
+   */
   public List<String> getIndices() {
     return indices;
   }
@@ -47,6 +51,10 @@ public class SearchRequest {
     this.indices = indices;
   }
 
+  /**
+   * The query to use to search the index
+   * @return
+   */
   public String getQuery() {
     return query;
   }
@@ -55,6 +63,10 @@ public class SearchRequest {
     this.query = query;
   }
 
+  /**
+   * The size of the results returned.
+   * @return
+   */
   public int getSize() {
     return size;
   }
@@ -63,6 +75,10 @@ public class SearchRequest {
     this.size = size;
   }
 
+  /**
+   * The index to start the search from.
+   * @return
+   */
   public int getFrom() {
     return from;
   }
@@ -71,6 +87,10 @@ public class SearchRequest {
     this.from = from;
   }
 
+  /**
+   * The search order by field.
+   * @return
+   */
   public List<SortField> getSort() {
     return sort;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java
index 2416357..aad489a 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java
@@ -29,6 +29,10 @@ public class SearchResponse {
   private List<SearchResult> results = new ArrayList<>();
   private Map<String, Map<String, Long>> facetCounts;
 
+  /**
+   * The total number of results
+   * @return
+   */
   public long getTotal() {
     return total;
   }
@@ -37,6 +41,10 @@ public class SearchResponse {
     this.total = total;
   }
 
+  /**
+   * The list of results
+   * @return
+   */
   public List<SearchResult> getResults() {
     return results;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResult.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResult.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResult.java
index ae4f9bd..9c00bea 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResult.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResult.java
@@ -24,7 +24,24 @@ public class SearchResult {
   private String id;
   private Map<String, Object> source;
   private float score;
+  private String index;
 
+  /**
+   * The index that the result comes from
+   * @return
+   */
+  public String getIndex() {
+    return index;
+  }
+
+  public void setIndex(String index) {
+    this.index = index;
+  }
+
+  /**
+   * The ID of the document from the index.
+   * @return
+   */
   public String getId() {
     return id;
   }
@@ -33,6 +50,10 @@ public class SearchResult {
     this.id = id;
   }
 
+  /**
+   * The source (the actual result).
+   * @return
+   */
   public Map<String, Object> getSource() {
     return source;
   }
@@ -41,6 +62,10 @@ public class SearchResult {
     this.source = source;
   }
 
+  /**
+   * The score from the index.
+   * @return
+   */
   public float getScore() {
     return score;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java
new file mode 100644
index 0000000..85c079f
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.indexing.dao.update;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+public class Document {
+  Long timestamp;
+  Map<String, Object> document;
+  String guid;
+  String sensorType;
+
+  public Document(Map<String, Object> document, String guid, String 
sensorType, Long timestamp) {
+    setDocument(document);
+    setGuid(guid);
+    setTimestamp(timestamp);
+    setSensorType(sensorType);
+  }
+
+
+  public Document(String document, String guid, String sensorType, Long 
timestamp) throws IOException {
+    this(convertDoc(document), guid, sensorType, timestamp);
+  }
+
+  public Document(String document, String guid, String sensorType) throws 
IOException {
+    this( document, guid, sensorType, null);
+  }
+
+  private static Map<String, Object> convertDoc(String document) throws 
IOException {
+      return JSONUtils.INSTANCE.load(document, new TypeReference<Map<String, 
Object>>() {
+      });
+  }
+
+  public String getSensorType() {
+    return sensorType;
+  }
+
+  public void setSensorType(String sensorType) {
+    this.sensorType = sensorType;
+  }
+
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(Long timestamp) {
+    this.timestamp = timestamp != null?timestamp:System.currentTimeMillis();
+  }
+
+  public Map<String, Object> getDocument() {
+    return document;
+  }
+
+  public void setDocument(Map<String, Object> document) {
+    this.document = document;
+  }
+
+  public String getGuid() {
+    return guid;
+  }
+
+  public void setGuid(String guid) {
+    this.guid = guid;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/OriginalNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/OriginalNotFoundException.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/OriginalNotFoundException.java
new file mode 100644
index 0000000..87a03ae
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/OriginalNotFoundException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao.update;
+
+public class OriginalNotFoundException extends Exception {
+  public OriginalNotFoundException(String s) {
+    super(s);
+  }
+
+  public OriginalNotFoundException(String s, Exception e) {
+    super(s, e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/PatchRequest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/PatchRequest.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/PatchRequest.java
new file mode 100644
index 0000000..77f5958
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/PatchRequest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao.update;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.Map;
+
+public class PatchRequest {
+  JsonNode patch;
+  Map<String, Object> source;
+  String guid;
+  String sensorType;
+  String index;
+
+  /**
+   * The index of the document to be updated.  This is optional, but could 
result in a performance gain if specified.
+   * @return
+   */
+  public String getIndex() {
+    return index;
+  }
+
+  public void setIndex(String index) {
+    this.index = index;
+  }
+
+  /**
+   * The patch.  This is in the form of a list of RFC 6902 patches.
+   * For example:
+   * <pre>
+   * [
+   *   {
+   *             "op": "add"
+   *            , "path": "/project"
+   *            , "value": "metron"
+   *   }
+   *           ]
+   * </pre>
+   * @return
+   */
+  public JsonNode getPatch() {
+    return patch;
+  }
+
+  public void setPatch(JsonNode patch) {
+    this.patch = patch;
+  }
+
+  /**
+   * The source document.  If this is specified, then it will be used as the 
basis of the patch rather than the current
+   * document in the index.
+   * @return
+   */
+  public Map<String, Object> getSource() {
+    return source;
+  }
+
+  public void setSource(Map<String, Object> source) {
+    this.source = source;
+  }
+
+  /**
+   * The GUID of the document to be patched.
+   * @return
+   */
+  public String getGuid() {
+    return guid;
+  }
+
+  public void setGuid(String guid) {
+    this.guid = guid;
+  }
+
+  /**
+   * The sensor type of the document.
+   * @return
+   */
+  public String getSensorType() {
+    return sensorType;
+  }
+
+  public void setSensorType(String sensorType) {
+    this.sensorType = sensorType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/ReplaceRequest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/ReplaceRequest.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/ReplaceRequest.java
new file mode 100644
index 0000000..96bca30
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/ReplaceRequest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao.update;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.Map;
+
+public class ReplaceRequest {
+  Map<String, Object> replacement;
+  String guid;
+  String sensorType;
+  String index;
+
+  /**
+   * Return the index of the request.  This is optional, but could result in 
better performance if specified.
+   * @return
+   */
+  public String getIndex() {
+    return index;
+  }
+
+  public void setIndex(String index) {
+    this.index = index;
+  }
+
+  /**
+   * The sensor type of the request. This is mandatory.
+   * @return
+   */
+  public String getSensorType() {
+    return sensorType;
+  }
+
+  public void setSensorType(String sensorType) {
+    this.sensorType = sensorType;
+  }
+
+  /**
+   * The replacement document.  This is mandatory.
+   * @return
+   */
+  public Map<String, Object> getReplacement() {
+    return replacement;
+  }
+
+  public void setReplacement(Map<String, Object> replacement) {
+    this.replacement = replacement;
+  }
+
+  /**
+   * The GUID of the document to replace.  This is mandatory.
+   * @return
+   */
+  public String getGuid() {
+    return guid;
+  }
+
+  public void setGuid(String guid) {
+    this.guid = guid;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
index ab83c7e..2d146e0 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
@@ -22,8 +22,10 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Iterables;
 import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.search.*;
+import org.apache.metron.indexing.dao.update.Document;
 
 import java.io.IOException;
 import java.util.*;
@@ -129,11 +131,41 @@ public class InMemoryDao implements IndexDao {
   }
 
   @Override
-  public void init(Map<String, Object> globalConfig, AccessConfig config) {
+  public void init(AccessConfig config) {
     this.config = config;
   }
 
   @Override
+  public Document getLatest(String guid, String sensorType) throws IOException 
{
+    for(Map.Entry<String, List<String>> kv: BACKING_STORE.entrySet()) {
+      if(kv.getKey().startsWith(sensorType)) {
+        for(String doc : kv.getValue()) {
+          Map<String, Object> docParsed = parse(doc);
+          if(docParsed.getOrDefault(Constants.GUID, "").equals(guid)) {
+            return new Document(doc, guid, sensorType, 0L);
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void update(Document update, Optional<String> index) throws 
IOException {
+    for (Map.Entry<String, List<String>> kv : BACKING_STORE.entrySet()) {
+      if (kv.getKey().startsWith(update.getSensorType())) {
+        for (Iterator<String> it = kv.getValue().iterator(); it.hasNext(); ) {
+          String doc = it.next();
+          Map<String, Object> docParsed = parse(doc);
+          if (docParsed.getOrDefault(Constants.GUID, 
"").equals(update.getGuid())) {
+            it.remove();
+          }
+        }
+        kv.getValue().add(JSONUtils.INSTANCE.toJSON(update.getDocument(), 
true));
+      }
+    }
+  }
+  
   public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> 
indices) throws IOException {
     Map<String, Map<String, FieldType>> columnMetadata = new HashMap<>();
     for(String index: indices) {

Reply via email to