http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHTable.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHTable.java b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHTable.java new file mode 100644 index 0000000..e75b533 --- /dev/null +++ b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHTable.java @@ -0,0 +1,701 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.hbase.mock; + + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.metron.hbase.TableProvider; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.NoSuchElementException; +import java.util.TreeMap; + +/** + * MockHTable. + * + * This implementation is a selected excerpt from https://gist.github.com/agaoglu/613217 + */ +public class MockHTable implements HTableInterface { + + + private final String tableName; + private final List<String> columnFamilies = new ArrayList<>(); + private HColumnDescriptor[] descriptors; + private final List<Put> putLog; + private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data + = new TreeMap<>(Bytes.BYTES_COMPARATOR); + + private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) { + return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions); + } + + private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) { + List<KeyValue> ret = new ArrayList<KeyValue>(); + for (byte[] family : rowdata.keySet()) + for (byte[] qualifier : rowdata.get(family).keySet()) { + int versionsAdded = 0; + for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) { + if (versionsAdded++ == maxVersions) + break; + Long timestamp = tsToVal.getKey(); + if (timestamp < timestampStart) + continue; + if (timestamp > timestampEnd) + continue; + byte[] value = tsToVal.getValue(); + ret.add(new KeyValue(row, family, qualifier, timestamp, value)); + } + } + return ret; + } + public MockHTable(String tableName) { + this.tableName = tableName; + this.putLog = new ArrayList<>(); + } + + public MockHTable(String tableName, String... columnFamilies) { + this(tableName); + for(String cf : columnFamilies) { + addColumnFamily(cf); + } + } + + public int size() { + return data.size(); + } + + public void addColumnFamily(String columnFamily) { + this.columnFamilies.add(columnFamily); + descriptors = new HColumnDescriptor[columnFamilies.size()]; + int i = 0; + for(String cf : columnFamilies) { + descriptors[i++] = new HColumnDescriptor(cf); + } + } + + @Override + public byte[] getTableName() { + return Bytes.toBytes(tableName); + } + + @Override + public TableName getName() { + return TableName.valueOf(tableName); + } + + @Override + public Configuration getConfiguration() { + return HBaseConfiguration.create(); + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + HTableDescriptor ret = new HTableDescriptor(tableName); + for(HColumnDescriptor c : descriptors) { + ret.addFamily(c); + } + return ret; + } + + @Override + public boolean exists(Get get) throws IOException { + if(get.getFamilyMap() == null || get.getFamilyMap().size() == 0) { + return data.containsKey(get.getRow()); + } else { + byte[] row = get.getRow(); + if(!data.containsKey(row)) { + return false; + } + for(byte[] family : get.getFamilyMap().keySet()) { + if(!data.get(row).containsKey(family)) { + return false; + } else { + return true; + } + } + return true; + } + } + + /** + * Test for the existence of columns in the table, as specified by the Gets. + * + * <p>This will return an array of booleans. Each value will be true if the related Get matches + * one or more keys, false if not. + * + * <p>This is a server-side call so it prevents any data from being transferred to + * the client. + * + * @param gets the Gets + * @return Array of boolean. True if the specified Get matches one or more keys, false if not. + * @throws IOException e + */ + @Override + public boolean[] existsAll(List<Get> gets) throws IOException { + boolean[] ret = new boolean[gets.size()]; + int i = 0; + for(boolean b : exists(gets)) { + ret[i++] = b; + } + return ret; + } + + @Override + public Boolean[] exists(List<Get> list) throws IOException { + Boolean[] ret = new Boolean[list.size()]; + int i = 0; + for(Get g : list) { + ret[i++] = exists(g); + } + return ret; + } + + @Override + public void batch(List<? extends Row> list, Object[] objects) throws IOException, InterruptedException { + Object[] results = batch(list); + System.arraycopy(results, 0, objects, 0, results.length); + } + + /** + * @param actions + * @deprecated + */ + @Deprecated + @Override + public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { + List<Result> results = new ArrayList<Result>(); + for (Row r : actions) { + if (r instanceof Delete) { + delete((Delete) r); + continue; + } + if (r instanceof Put) { + put((Put) r); + continue; + } + if (r instanceof Get) { + results.add(get((Get) r)); + } + } + return results.toArray(); + } + + @Override + public <R> void batchCallback(List<? extends Row> list, Object[] objects, Batch.Callback<R> callback) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + + } + + /** + * @param list + * @param callback + * @deprecated + */ + @Deprecated + @Override + public <R> Object[] batchCallback(List<? extends Row> list, Batch.Callback<R> callback) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public Result get(Get get) throws IOException { + if (!data.containsKey(get.getRow())) + return new Result(); + byte[] row = get.getRow(); + List<KeyValue> kvs = new ArrayList<KeyValue>(); + if (!get.hasFamilies()) { + kvs = toKeyValue(row, data.get(row), get.getMaxVersions()); + } else { + for (byte[] family : get.getFamilyMap().keySet()){ + if (data.get(row).get(family) == null) + continue; + NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family); + if (qualifiers == null || qualifiers.isEmpty()) + qualifiers = data.get(row).get(family).navigableKeySet(); + for (byte[] qualifier : qualifiers){ + if (qualifier == null) + qualifier = "".getBytes(); + if (!data.get(row).containsKey(family) || + !data.get(row).get(family).containsKey(qualifier) || + data.get(row).get(family).get(qualifier).isEmpty()) + continue; + Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry(); + kvs.add(new KeyValue(row,family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue())); + } + } + } + Filter filter = get.getFilter(); + if (filter != null) { + filter.reset(); + List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size()); + for (KeyValue kv : kvs) { + if (filter.filterAllRemaining()) { + break; + } + if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) { + continue; + } + if (filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE) { + nkvs.add(kv); + } + // ignoring next key hint which is a optimization to reduce file system IO + } + if (filter.hasFilterRow()) { + filter.filterRow(); + } + kvs = nkvs; + } + + return new Result(kvs); + } + + @Override + public Result[] get(List<Get> list) throws IOException { + Result[] ret = new Result[list.size()]; + int i = 0; + for(Get g : list) { + ret[i++] = get(g); + } + return ret; + } + + /** + * @param bytes + * @param bytes1 + * @deprecated + */ + @Deprecated + @Override + public Result getRowOrBefore(byte[] bytes, byte[] bytes1) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + final List<Result> ret = new ArrayList<Result>(); + byte[] st = scan.getStartRow(); + byte[] sp = scan.getStopRow(); + Filter filter = scan.getFilter(); + + for (byte[] row : data.keySet()){ + // if row is equal to startRow emit it. When startRow (inclusive) and + // stopRow (exclusive) is the same, it should not be excluded which would + // happen w/o this control. + if (st != null && st.length > 0 && + Bytes.BYTES_COMPARATOR.compare(st, row) != 0) { + // if row is before startRow do not emit, pass to next row + if (st != null && st.length > 0 && + Bytes.BYTES_COMPARATOR.compare(st, row) > 0) + continue; + // if row is equal to stopRow or after it do not emit, stop iteration + if (sp != null && sp.length > 0 && + Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0) + break; + } + + List<KeyValue> kvs = null; + if (!scan.hasFamilies()) { + kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions()); + } else { + kvs = new ArrayList<KeyValue>(); + for (byte[] family : scan.getFamilyMap().keySet()){ + if (data.get(row).get(family) == null) + continue; + NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family); + if (qualifiers == null || qualifiers.isEmpty()) + qualifiers = data.get(row).get(family).navigableKeySet(); + for (byte[] qualifier : qualifiers){ + if (data.get(row).get(family).get(qualifier) == null) + continue; + for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()){ + if (timestamp < scan.getTimeRange().getMin()) + continue; + if (timestamp > scan.getTimeRange().getMax()) + continue; + byte[] value = data.get(row).get(family).get(qualifier).get(timestamp); + kvs.add(new KeyValue(row, family, qualifier, timestamp, value)); + if(kvs.size() == scan.getMaxVersions()) { + break; + } + } + } + } + } + if (filter != null) { + filter.reset(); + List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size()); + for (KeyValue kv : kvs) { + if (filter.filterAllRemaining()) { + break; + } + if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) { + continue; + } + Filter.ReturnCode filterResult = filter.filterKeyValue(kv); + if (filterResult == Filter.ReturnCode.INCLUDE) { + nkvs.add(kv); + } else if (filterResult == Filter.ReturnCode.NEXT_ROW) { + break; + } + // ignoring next key hint which is a optimization to reduce file system IO + } + if (filter.hasFilterRow()) { + filter.filterRow(); + } + kvs = nkvs; + } + if (!kvs.isEmpty()) { + ret.add(new Result(kvs)); + } + } + + return new ResultScanner() { + private final Iterator<Result> iterator = ret.iterator(); + @Override + public Iterator<Result> iterator() { + return iterator; + } + @Override + public Result[] next(int nbRows) throws IOException { + ArrayList<Result> resultSets = new ArrayList<Result>(nbRows); + for(int i = 0; i < nbRows; i++) { + Result next = next(); + if (next != null) { + resultSets.add(next); + } else { + break; + } + } + return resultSets.toArray(new Result[resultSets.size()]); + } + @Override + public Result next() throws IOException { + try { + return iterator().next(); + } catch (NoSuchElementException e) { + return null; + } + } + @Override + public void close() {} + }; + } + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + Scan scan = new Scan(); + scan.addFamily(family); + return getScanner(scan); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) + throws IOException { + Scan scan = new Scan(); + scan.addColumn(family, qualifier); + return getScanner(scan); + } + + public List<Put> getPutLog() { + synchronized (putLog) { + return ImmutableList.copyOf(putLog); + } + } + + public void addToPutLog(Put put) { + synchronized(putLog) { + putLog.add(put); + } + } + + public void clear() { + synchronized (putLog) { + putLog.clear(); + } + data.clear(); + } + + @Override + public void put(Put put) throws IOException { + addToPutLog(put); + + byte[] row = put.getRow(); + NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR)); + for (byte[] family : put.getFamilyMap().keySet()){ + NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR)); + for (KeyValue kv : put.getFamilyMap().get(family)){ + kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis())); + byte[] qualifier = kv.getQualifier(); + NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>()); + qualifierData.put(kv.getTimestamp(), kv.getValue()); + } + } + } + + /** + * Helper method to find a key in a map. If key is not found, newObject is + * added to map and returned + * + * @param map + * map to extract value from + * @param key + * key to look for + * @param newObject + * set key to this if not found + * @return found value or newObject if not found + */ + private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject){ + V data = map.get(key); + if (data == null){ + data = newObject; + map.put(key, data); + } + return data; + } + + @Override + public void put(List<Put> puts) throws IOException { + for (Put put : puts) + put(put); + } + + @Override + public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Put put) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected + * value. If it does, it adds the put. If the passed value is null, the check + * is for the lack of column (ie: non-existance) + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp comparison operator to use + * @param value the expected value + * @param put data to put if check succeeds + * @return true if the new put was executed, false otherwise + * @throws IOException e + */ + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException { + return false; + } + + @Override + public void delete(Delete delete) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void delete(List<Delete> list) throws IOException { + throw new UnsupportedOperationException(); + + } + + @Override + public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Delete delete) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected + * value. If it does, it adds the delete. If the passed value is null, the + * check is for the lack of column (ie: non-existance) + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp comparison operator to use + * @param value the expected value + * @param delete data to delete if check succeeds + * @return true if the new delete was executed, false otherwise + * @throws IOException e + */ + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException { + return false; + } + + @Override + public void mutateRow(RowMutations rowMutations) throws IOException { + throw new UnsupportedOperationException(); + + } + + @Override + public Result append(Append append) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Result increment(Increment increment) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, Durability durability) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * @param bytes + * @param bytes1 + * @param bytes2 + * @param l + * @param b + * @deprecated + */ + @Deprecated + @Override + public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, boolean b) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isAutoFlush() { + return autoflush; + } + + @Override + public void flushCommits() throws IOException { + + } + + @Override + public void close() throws IOException { + + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] bytes) { + throw new UnsupportedOperationException(); + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call) throws ServiceException, Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call, Batch.Callback<R> callback) throws ServiceException, Throwable { + throw new UnsupportedOperationException(); + } + + boolean autoflush = true; + + /** + * @param b + * @deprecated + */ + @Deprecated + @Override + public void setAutoFlush(boolean b) { + autoflush = b; + } + + @Override + public void setAutoFlush(boolean b, boolean b1) { + autoflush = b; + } + + @Override + public void setAutoFlushTo(boolean b) { + autoflush = b; + } + + long writeBufferSize = 0; + @Override + public long getWriteBufferSize() { + return writeBufferSize; + } + + @Override + public void setWriteBufferSize(long l) throws IOException { + writeBufferSize = l; + } + + @Override + public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r) throws ServiceException, Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r, Batch.Callback<R> callback) throws ServiceException, Throwable { + throw new UnsupportedOperationException(); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. + * If it does, it performs the row mutations. If the passed value is null, the check + * is for the lack of column (ie: non-existence) + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp the comparison operator + * @param value the expected value + * @param mutation mutations to perform if check succeeds + * @return true if the new put was executed, false otherwise + * @throws IOException e + */ + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException { + return false; + } +}
http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md index 0cf3a66..2095d0f 100644 --- a/metron-platform/metron-indexing/README.md +++ b/metron-platform/metron-indexing/README.md @@ -111,6 +111,30 @@ Storm console. e.g.: * hdfs writer * disabled +# Updates to Indexed Data + +There are clear usecases where we would want to incorporate the capability to update indexed data. +Thus far, we have limited capabilities provided to support this use-case: +* Updates to the random access index (e.g. Elasticsearch and Solr) should be supported +* Updates to the cold storage index (e.g. HDFS) is not supported currently, however to support the batch +use-case updated documents will be provided in a NoSQL write-ahead log (e.g. a HBase table) and an Java API +will be provided to retrieve those updates scalably (i.e. a scan-free architecture). + +Put simply, the random access index will be always up-to-date, but the HDFS index will need to be +joined to the NoSQL write-ahead log to get current updates. + +## The `IndexDao` Abstraction + +The indices mentioned above as part of Update should be pluggable by the developer so that +new write-ahead logs or real-time indices can be supported by providing an implementation supporting +the data access patterns. + +To support a new index, one would need to implement the `org.apache.metron.indexing.dao.IndexDao` abstraction +and provide update and search capabilities. IndexDaos may be composed and updates will be performed +in parallel. This enables a flexible strategy for specifying your backing store for updates at runtime. +For instance, currently the REST API supports the update functionality and may be configured with a list of +IndexDao implementations to use to support the updates. + # Notes on Performance Tuning Default installed Metron is untuned for production deployment. By far http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/pom.xml b/metron-platform/metron-indexing/pom.xml index b0127bb..c64c374 100644 --- a/metron-platform/metron-indexing/pom.xml +++ b/metron-platform/metron-indexing/pom.xml @@ -37,6 +37,32 @@ <version>${project.parent.version}</version> </dependency> <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-hbase</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${global_hbase_version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${global_hadoop_version}</version> @@ -47,7 +73,27 @@ </exclusion> </exclusions> <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.flipkart.zjsonpatch</groupId> + <artifactId>zjsonpatch</artifactId> + <version>0.3.1</version> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + </exclusions> </dependency> + <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java index dd68484..ddb88e5 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java @@ -17,13 +17,35 @@ */ package org.apache.metron.indexing.dao; +import org.apache.metron.hbase.HTableProvider; +import org.apache.metron.hbase.TableProvider; + import java.util.HashMap; import java.util.Map; +import java.util.function.Supplier; public class AccessConfig { private Integer maxSearchResults; + private Supplier<Map<String, Object>> globalConfigSupplier; private Map<String, String> optionalSettings = new HashMap<>(); + private TableProvider tableProvider = null; + + /** + * A supplier which will return the current global config. + * @return + */ + public Supplier<Map<String, Object>> getGlobalConfigSupplier() { + return globalConfigSupplier; + } + + public void setGlobalConfigSupplier(Supplier<Map<String, Object>> globalConfigSupplier) { + this.globalConfigSupplier = globalConfigSupplier; + } + /** + * The maximum search result. + * @return + */ public Integer getMaxSearchResults() { return maxSearchResults; } @@ -32,6 +54,10 @@ public class AccessConfig { this.maxSearchResults = maxSearchResults; } + /** + * Get optional settings for initializing indices. + * @return + */ public Map<String, String> getOptionalSettings() { return optionalSettings; } @@ -39,4 +65,17 @@ public class AccessConfig { public void setOptionalSettings(Map<String, String> optionalSettings) { this.optionalSettings = optionalSettings; } + + /** + * Return the table provider to use for NoSql DAOs + * @return + */ + public TableProvider getTableProvider() { + return tableProvider; + } + + public void setTableProvider(TableProvider tableProvider) { + this.tableProvider = tableProvider; + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java new file mode 100644 index 0000000..a1cf398 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.indexing.dao; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.update.Document; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; + +/** + * The HBaseDao is an index dao which only supports the following actions: + * * Update + * * Get document + * + * The mechanism here is that updates to documents will be added to a HBase Table as a write-ahead log. + * The Key for a row supporting a given document will be the GUID, which should be sufficiently distributed. + * Every new update will have a column added (column qualifier will be the timestamp of the update). + * Upon retrieval, the most recent column will be returned. + * + */ +public class HBaseDao implements IndexDao { + public static String HBASE_TABLE = "update.hbase.table"; + public static String HBASE_CF = "update.hbase.cf"; + private HTableInterface tableInterface; + private byte[] cf; + private AccessConfig config; + public HBaseDao() { + + } + + @Override + public synchronized SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { + return null; + } + + @Override + public synchronized void init(AccessConfig config) { + if(this.tableInterface == null) { + this.config = config; + Map<String, Object> globalConfig = config.getGlobalConfigSupplier().get(); + if(globalConfig == null) { + throw new IllegalStateException("Cannot find the global config."); + } + String table = (String)globalConfig.get(HBASE_TABLE); + String cf = (String) config.getGlobalConfigSupplier().get().get(HBASE_CF); + if(table == null || cf == null) { + throw new IllegalStateException("You must configure " + HBASE_TABLE + " and " + HBASE_CF + " in the global config."); + } + try { + tableInterface = config.getTableProvider().getTable(HBaseConfiguration.create(), table); + this.cf = cf.getBytes(); + } catch (IOException e) { + throw new IllegalStateException("Unable to initialize HBaseDao: " + e.getMessage(), e); + } + } + } + + public HTableInterface getTableInterface() { + if(tableInterface == null) { + init(config); + } + return tableInterface; + } + + @Override + public synchronized Document getLatest(String guid, String sensorType) throws IOException { + Get get = new Get(guid.getBytes()); + get.addFamily(cf); + Result result = getTableInterface().get(get); + NavigableMap<byte[], byte[]> columns = result.getFamilyMap( cf); + if(columns == null || columns.size() == 0) { + return null; + } + Map.Entry<byte[], byte[]> entry= columns.lastEntry(); + Long ts = Bytes.toLong(entry.getKey()); + if(entry.getValue()!= null) { + String json = new String(entry.getValue()); + return new Document(json, guid, sensorType, ts); + } + else { + return null; + } + } + + @Override + public synchronized void update(Document update, Optional<String> index) throws IOException { + Put put = new Put(update.getGuid().getBytes()); + long ts = update.getTimestamp() == null?System.currentTimeMillis():update.getTimestamp(); + byte[] columnQualifier = Bytes.toBytes(ts); + byte[] doc = JSONUtils.INSTANCE.toJSON(update.getDocument()); + put.addColumn(cf, columnQualifier, doc); + getTableInterface().put(put); + } + + @Override + public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) throws IOException { + return null; + } + + @Override + public Map<String, FieldType> getCommonColumnMetadata(List<String> indices) throws IOException { + return null; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java index 31fe74e..350e402 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java @@ -17,18 +17,129 @@ */ package org.apache.metron.indexing.dao; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.flipkart.zjsonpatch.JsonPatch; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.update.Document; +import org.apache.metron.indexing.dao.update.PatchRequest; +import org.apache.metron.indexing.dao.update.ReplaceRequest; +import org.apache.metron.indexing.dao.update.OriginalNotFoundException; + +import java.io.IOException; import org.apache.metron.indexing.dao.search.FieldType; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; public interface IndexDao { + + /** + * Return search response based on the search request + * + * @param searchRequest + * @return + * @throws InvalidSearchException + */ SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException; - void init(Map<String, Object> globalConfig, AccessConfig config); + + /** + * Initialize the DAO with the AccessConfig object. + * @param config + */ + void init(AccessConfig config); + + /** + * Return the latest version of a document given the GUID and the sensor type. + * + * @param guid The GUID for the document + * @param sensorType The sensor type of the document + * @return The Document matching or null if not available. + * @throws IOException + */ + Document getLatest(String guid, String sensorType) throws IOException; + + /** + * Return the latest version of a document given a GetRequest. + * @param request The GetRequest which indicates the GUID and sensor type. + * @return Optionally the document (dependent upon existence in the index). + * @throws IOException + */ + default Optional<Map<String, Object>> getLatestResult(GetRequest request) throws IOException { + Document ret = getLatest(request.getGuid(), request.getSensorType()); + if(ret == null) { + return Optional.empty(); + } + else { + return Optional.ofNullable(ret.getDocument()); + } + } + + /** + * Update given a Document and optionally the index where the document exists. + * + * @param update The document to replace from the index. + * @param index The index where the document lives. + * @throws IOException + */ + void update(Document update, Optional<String> index) throws IOException; + + + /** + * Update a document in an index given a JSON Patch (see RFC 6902 at https://tools.ietf.org/html/rfc6902) + * @param request The patch request + * @param timestamp Optionally a timestamp to set. If not specified then current time is used. + * @throws OriginalNotFoundException If the original is not found, then it cannot be patched. + * @throws IOException + */ + default void patch( PatchRequest request + , Optional<Long> timestamp + ) throws OriginalNotFoundException, IOException { + Map<String, Object> latest = request.getSource(); + if(latest == null) { + Document latestDoc = getLatest(request.getGuid(), request.getSensorType()); + if(latestDoc.getDocument() != null) { + latest = latestDoc.getDocument(); + } + else { + throw new OriginalNotFoundException("Unable to patch an document that doesn't exist and isn't specified."); + } + } + JsonNode originalNode = JSONUtils.INSTANCE.convert(latest, JsonNode.class); + JsonNode patched = JsonPatch.apply(request.getPatch(), originalNode); + Map<String, Object> updated = JSONUtils.INSTANCE.getMapper() + .convertValue(patched, new TypeReference<Map<String, Object>>() {}); + Document d = new Document( updated + , request.getGuid() + , request.getSensorType() + , timestamp.orElse(System.currentTimeMillis()) + ); + update(d, Optional.ofNullable(request.getIndex())); + } + + /** + * Replace a document in an index. + * @param request The replacement request. + * @param timestamp The timestamp (optional) of the update. If not specified, then current time will be used. + * @throws IOException + */ + default void replace( ReplaceRequest request + , Optional<Long> timestamp + ) throws IOException { + Document d = new Document(request.getReplacement() + , request.getGuid() + , request.getSensorType() + , timestamp.orElse(System.currentTimeMillis()) + ); + update(d, Optional.ofNullable(request.getIndex())); + } + Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) throws IOException; Map<String, FieldType> getCommonColumnMetadata(List<String> indices) throws IOException; } http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java index 9c2de0e..e8df0b7 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDaoFactory.java @@ -17,14 +17,42 @@ */ package org.apache.metron.indexing.dao; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; + import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.function.Function; public class IndexDaoFactory { - public static IndexDao create(String daoImpl, Map<String, Object> globalConfig, AccessConfig config) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { - Class<? extends IndexDao> clazz = (Class<? extends IndexDao>) Class.forName(daoImpl); - IndexDao instance = clazz.getConstructor().newInstance(); - instance.init(globalConfig, config); - return instance; + public static List<IndexDao> create( String daoImpls + , AccessConfig config + ) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException + { + List<IndexDao> ret = new ArrayList<>(); + for(String daoImpl : Splitter.on(",").split(daoImpls)) { + Class<? extends IndexDao> clazz = (Class<? extends IndexDao>) Class.forName(daoImpl); + IndexDao instance = clazz.getConstructor().newInstance(); + instance.init(config); + ret.add(instance); + } + return ret; + } + + public static IndexDao combine(Iterable<IndexDao> daos) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { + return combine(daos, x -> x); + } + + public static IndexDao combine(Iterable<IndexDao> daos, Function<IndexDao, IndexDao> daoTransformation) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { + int numDaos = Iterables.size(daos); + if(numDaos == 0) { + throw new IllegalArgumentException("Trying to combine 0 dao's into a DAO is not a supported configuration."); + } + if( numDaos == 1) { + return daoTransformation.apply(Iterables.getFirst(daos, null)); + } + return new MultiIndexDao(daos, daoTransformation); } } http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexUpdateCallback.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexUpdateCallback.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexUpdateCallback.java new file mode 100644 index 0000000..4cb4d7e --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexUpdateCallback.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.indexing.dao; + +import org.apache.metron.indexing.dao.update.Document; + +import java.io.IOException; + +public interface IndexUpdateCallback { + void postUpdate(IndexDao dao, Document doc) throws IOException; +} http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java new file mode 100644 index 0000000..e9a4a9a --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.indexing.dao; + +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.update.Document; + +import java.io.IOException; +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class MultiIndexDao implements IndexDao { + private List<IndexDao> indices; + + public MultiIndexDao( IndexDao... composedDao) { + indices = new ArrayList<>(); + Collections.addAll(indices, composedDao); + } + + public MultiIndexDao(Iterable<IndexDao> composedDao) { + this.indices = new ArrayList<>(); + Iterables.addAll(indices, composedDao); + } + + public MultiIndexDao(Iterable<IndexDao> composedDao, Function<IndexDao, IndexDao> decoratorTransformation) { + this(Iterables.transform(composedDao, x -> decoratorTransformation.apply(x))); + } + + @Override + public void update(final Document update, Optional<String> index) throws IOException { + List<String> exceptions = + indices.parallelStream().map(dao -> { + try { + dao.update(update, index); + return null; + } catch (Throwable e) { + return dao.getClass() + ": " + e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e); + } + }).filter(e -> e != null).collect(Collectors.toList()); + if(exceptions.size() > 0) { + throw new IOException(Joiner.on("\n").join(exceptions)); + } + } + + @Override + public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> in) throws IOException { + for(IndexDao dao : indices) { + Map<String, Map<String, FieldType>> r = dao.getColumnMetadata(in); + if(r != null) { + return r; + } + } + return null; + } + + @Override + public Map<String, FieldType> getCommonColumnMetadata(List<String> in) throws IOException { + for(IndexDao dao : indices) { + Map<String, FieldType> r = dao.getCommonColumnMetadata(in); + if(r != null) { + return r; + } + } + return null; + } + + private static class DocumentContainer { + private Optional<Document> d = Optional.empty(); + private Optional<Throwable> t = Optional.empty(); + public DocumentContainer(Document d) { + this.d = Optional.ofNullable(d); + } + public DocumentContainer(Throwable t) { + this.t = Optional.ofNullable(t); + } + + public Optional<Document> getDocument() { + return d; + } + public Optional<Throwable> getException() { + return t; + } + + } + + @Override + public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { + for(IndexDao dao : indices) { + SearchResponse s = dao.search(searchRequest); + if(s != null) { + return s; + } + } + return null; + } + + @Override + public void init(AccessConfig config) { + for(IndexDao dao : indices) { + dao.init(config); + } + } + + @Override + public Document getLatest(final String guid, String sensorType) throws IOException { + Document ret = null; + List<DocumentContainer> output = + indices.parallelStream().map(dao -> { + try { + return new DocumentContainer(dao.getLatest(guid, sensorType)); + } catch (Throwable e) { + return new DocumentContainer(e); + } + }).collect(Collectors.toList()); + + List<String> error = new ArrayList<>(); + for(DocumentContainer dc : output) { + if(dc.getException().isPresent()) { + Throwable e = dc.getException().get(); + error.add(e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e)); + } + else { + if(dc.getDocument().isPresent()) { + Document d = dc.getDocument().get(); + if(ret == null || ret.getTimestamp() < d.getTimestamp()) { + ret = d; + } + } + } + } + if(error.size() > 0) { + throw new IOException(Joiner.on("\n").join(error)); + } + return ret; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java new file mode 100644 index 0000000..eb255dc --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.indexing.dao.search; + +public class GetRequest { + String guid; + String sensorType; + + /** + * The GUID of the document + * @return + */ + public String getGuid() { + return guid; + } + + public void setGuid(String guid) { + this.guid = guid; + } + + /** + * The sensor type of the indices that you're searching. + * @return + */ + public String getSensorType() { + return sensorType; + } + + public void setSensorType(String sensorType) { + this.sensorType = sensorType; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java index b92b36d..897f918 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java @@ -39,6 +39,10 @@ public class SearchRequest { facetFields = new ArrayList<>(); } + /** + * The list of indices to search. + * @return + */ public List<String> getIndices() { return indices; } @@ -47,6 +51,10 @@ public class SearchRequest { this.indices = indices; } + /** + * The query to use to search the index + * @return + */ public String getQuery() { return query; } @@ -55,6 +63,10 @@ public class SearchRequest { this.query = query; } + /** + * The size of the results returned. + * @return + */ public int getSize() { return size; } @@ -63,6 +75,10 @@ public class SearchRequest { this.size = size; } + /** + * The index to start the search from. + * @return + */ public int getFrom() { return from; } @@ -71,6 +87,10 @@ public class SearchRequest { this.from = from; } + /** + * The search order by field. + * @return + */ public List<SortField> getSort() { return sort; } http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java index 2416357..aad489a 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java @@ -29,6 +29,10 @@ public class SearchResponse { private List<SearchResult> results = new ArrayList<>(); private Map<String, Map<String, Long>> facetCounts; + /** + * The total number of results + * @return + */ public long getTotal() { return total; } @@ -37,6 +41,10 @@ public class SearchResponse { this.total = total; } + /** + * The list of results + * @return + */ public List<SearchResult> getResults() { return results; } http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResult.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResult.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResult.java index ae4f9bd..9c00bea 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResult.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResult.java @@ -24,7 +24,24 @@ public class SearchResult { private String id; private Map<String, Object> source; private float score; + private String index; + /** + * The index that the result comes from + * @return + */ + public String getIndex() { + return index; + } + + public void setIndex(String index) { + this.index = index; + } + + /** + * The ID of the document from the index. + * @return + */ public String getId() { return id; } @@ -33,6 +50,10 @@ public class SearchResult { this.id = id; } + /** + * The source (the actual result). + * @return + */ public Map<String, Object> getSource() { return source; } @@ -41,6 +62,10 @@ public class SearchResult { this.source = source; } + /** + * The score from the index. + * @return + */ public float getScore() { return score; } http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java new file mode 100644 index 0000000..85c079f --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.indexing.dao.update; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.metron.common.utils.JSONUtils; + +import java.io.IOException; +import java.util.Map; +import java.util.Optional; + +public class Document { + Long timestamp; + Map<String, Object> document; + String guid; + String sensorType; + + public Document(Map<String, Object> document, String guid, String sensorType, Long timestamp) { + setDocument(document); + setGuid(guid); + setTimestamp(timestamp); + setSensorType(sensorType); + } + + + public Document(String document, String guid, String sensorType, Long timestamp) throws IOException { + this(convertDoc(document), guid, sensorType, timestamp); + } + + public Document(String document, String guid, String sensorType) throws IOException { + this( document, guid, sensorType, null); + } + + private static Map<String, Object> convertDoc(String document) throws IOException { + return JSONUtils.INSTANCE.load(document, new TypeReference<Map<String, Object>>() { + }); + } + + public String getSensorType() { + return sensorType; + } + + public void setSensorType(String sensorType) { + this.sensorType = sensorType; + } + + public Long getTimestamp() { + return timestamp; + } + + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp != null?timestamp:System.currentTimeMillis(); + } + + public Map<String, Object> getDocument() { + return document; + } + + public void setDocument(Map<String, Object> document) { + this.document = document; + } + + public String getGuid() { + return guid; + } + + public void setGuid(String guid) { + this.guid = guid; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/OriginalNotFoundException.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/OriginalNotFoundException.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/OriginalNotFoundException.java new file mode 100644 index 0000000..87a03ae --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/OriginalNotFoundException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.indexing.dao.update; + +public class OriginalNotFoundException extends Exception { + public OriginalNotFoundException(String s) { + super(s); + } + + public OriginalNotFoundException(String s, Exception e) { + super(s, e); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/PatchRequest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/PatchRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/PatchRequest.java new file mode 100644 index 0000000..77f5958 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/PatchRequest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.indexing.dao.update; + +import com.fasterxml.jackson.databind.JsonNode; + +import java.util.Map; + +public class PatchRequest { + JsonNode patch; + Map<String, Object> source; + String guid; + String sensorType; + String index; + + /** + * The index of the document to be updated. This is optional, but could result in a performance gain if specified. + * @return + */ + public String getIndex() { + return index; + } + + public void setIndex(String index) { + this.index = index; + } + + /** + * The patch. This is in the form of a list of RFC 6902 patches. + * For example: + * <pre> + * [ + * { + * "op": "add" + * , "path": "/project" + * , "value": "metron" + * } + * ] + * </pre> + * @return + */ + public JsonNode getPatch() { + return patch; + } + + public void setPatch(JsonNode patch) { + this.patch = patch; + } + + /** + * The source document. If this is specified, then it will be used as the basis of the patch rather than the current + * document in the index. + * @return + */ + public Map<String, Object> getSource() { + return source; + } + + public void setSource(Map<String, Object> source) { + this.source = source; + } + + /** + * The GUID of the document to be patched. + * @return + */ + public String getGuid() { + return guid; + } + + public void setGuid(String guid) { + this.guid = guid; + } + + /** + * The sensor type of the document. + * @return + */ + public String getSensorType() { + return sensorType; + } + + public void setSensorType(String sensorType) { + this.sensorType = sensorType; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/ReplaceRequest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/ReplaceRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/ReplaceRequest.java new file mode 100644 index 0000000..96bca30 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/ReplaceRequest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.indexing.dao.update; + +import com.fasterxml.jackson.databind.JsonNode; + +import java.util.Map; + +public class ReplaceRequest { + Map<String, Object> replacement; + String guid; + String sensorType; + String index; + + /** + * Return the index of the request. This is optional, but could result in better performance if specified. + * @return + */ + public String getIndex() { + return index; + } + + public void setIndex(String index) { + this.index = index; + } + + /** + * The sensor type of the request. This is mandatory. + * @return + */ + public String getSensorType() { + return sensorType; + } + + public void setSensorType(String sensorType) { + this.sensorType = sensorType; + } + + /** + * The replacement document. This is mandatory. + * @return + */ + public Map<String, Object> getReplacement() { + return replacement; + } + + public void setReplacement(Map<String, Object> replacement) { + this.replacement = replacement; + } + + /** + * The GUID of the document to replace. This is mandatory. + * @return + */ + public String getGuid() { + return guid; + } + + public void setGuid(String guid) { + this.guid = guid; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java index ab83c7e..2d146e0 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java @@ -22,8 +22,10 @@ import com.google.common.base.Splitter; import com.google.common.collect.ComparisonChain; import com.google.common.collect.Iterables; import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.search.*; +import org.apache.metron.indexing.dao.update.Document; import java.io.IOException; import java.util.*; @@ -129,11 +131,41 @@ public class InMemoryDao implements IndexDao { } @Override - public void init(Map<String, Object> globalConfig, AccessConfig config) { + public void init(AccessConfig config) { this.config = config; } @Override + public Document getLatest(String guid, String sensorType) throws IOException { + for(Map.Entry<String, List<String>> kv: BACKING_STORE.entrySet()) { + if(kv.getKey().startsWith(sensorType)) { + for(String doc : kv.getValue()) { + Map<String, Object> docParsed = parse(doc); + if(docParsed.getOrDefault(Constants.GUID, "").equals(guid)) { + return new Document(doc, guid, sensorType, 0L); + } + } + } + } + return null; + } + + @Override + public void update(Document update, Optional<String> index) throws IOException { + for (Map.Entry<String, List<String>> kv : BACKING_STORE.entrySet()) { + if (kv.getKey().startsWith(update.getSensorType())) { + for (Iterator<String> it = kv.getValue().iterator(); it.hasNext(); ) { + String doc = it.next(); + Map<String, Object> docParsed = parse(doc); + if (docParsed.getOrDefault(Constants.GUID, "").equals(update.getGuid())) { + it.remove(); + } + } + kv.getValue().add(JSONUtils.INSTANCE.toJSON(update.getDocument(), true)); + } + } + } + public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) throws IOException { Map<String, Map<String, FieldType>> columnMetadata = new HashMap<>(); for(String index: indices) {