This is an automated email from the ASF dual-hosted git repository. nickallen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push: new eb2aee1 METRON-1879 Allow Elasticsearch to Auto-Generate the Document ID (nickwallen) closes apache/metron#1269 eb2aee1 is described below commit eb2aee1820cae811c6491d68aed32ccc055922e4 Author: nickwallen <n...@nickallen.org> AuthorDate: Thu Dec 13 14:29:16 2018 -0500 METRON-1879 Allow Elasticsearch to Auto-Generate the Document ID (nickwallen) closes apache/metron#1269 --- .../e2e/alerts-list/alerts-list.e2e-spec.ts | 19 +- .../configure-table/configure-table.e2e-spec.ts | 17 +- .../table-view/table-view.component.html | 2 +- .../alerts-list/table-view/table-view.component.ts | 9 +- .../alerts-list/tree-view/tree-view.component.ts | 2 +- .../app/service/elasticsearch-localstorage-impl.ts | 2 +- .../bulk/ElasticsearchBulkDocumentWriter.java | 17 +- .../dao/ElasticsearchMetaAlertUpdateDao.java | 16 +- .../dao/ElasticsearchRequestSubmitter.java | 3 +- .../dao/ElasticsearchRetrieveLatestDao.java | 127 ++++----- .../bulk/ElasticsearchBulkDocumentWriterTest.java | 35 ++- .../dao/ElasticsearchRequestSubmitterTest.java | 12 +- ...ticsearchBulkDocumentWriterIntegrationTest.java | 159 ++++++++++++ .../apache/metron/indexing/dao/MultiIndexDao.java | 115 ++++++--- .../indexing/dao/metaalert/MetaAlertConstants.java | 14 + .../metron/indexing/dao/update/Document.java | 124 ++++++--- .../metron/indexing/dao/update/UpdateDao.java | 34 ++- .../dao/metaalert/MetaAlertIntegrationTest.java | 284 +++++++++++---------- 18 files changed, 670 insertions(+), 321 deletions(-) diff --git a/metron-interface/metron-alerts/e2e/alerts-list/alerts-list.e2e-spec.ts b/metron-interface/metron-alerts/e2e/alerts-list/alerts-list.e2e-spec.ts index e3709ab..70f52de 100644 --- a/metron-interface/metron-alerts/e2e/alerts-list/alerts-list.e2e-spec.ts +++ b/metron-interface/metron-alerts/e2e/alerts-list/alerts-list.e2e-spec.ts @@ -24,9 +24,9 @@ import { loadTestData, deleteTestData } from '../utils/e2e_util'; describe('Test spec for all ui elements & list view', function() { let page: MetronAlertsPage; let loginPage: LoginPage; - let columnNames = [ '', 'Score', 'id', 'timestamp', 'source:type', 'ip_src_addr', 'enrichm...:country', + let columnNames = [ '', 'Score', 'guid', 'timestamp', 'source:type', 'ip_src_addr', 'enrichm...:country', 'ip_dst_addr', 'host', 'alert_status', '', '', '']; - let colNamesColumnConfig = [ 'score', 'id', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country', + let colNamesColumnConfig = [ 'score', 'guid', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country', 'ip_dst_addr', 'host', 'alert_status' ]; beforeAll(async function() : Promise<any> { @@ -136,16 +136,17 @@ describe('Test spec for all ui elements & list view', function() { }); it('should select columns from table configuration', async function() : Promise<any> { - let newColNamesColumnConfig = [ 'score', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country', - 'ip_dst_addr', 'host', 'alert_status', 'guid' ]; - await page.clickConfigureTable(); - expect(await page.getSelectedColumnNames()).toEqual(colNamesColumnConfig, 'for default selected column names'); - await page.toggleSelectCol('id'); + expect(await page.getSelectedColumnNames()).toEqual(colNamesColumnConfig, 'expect default selected column names'); + + // remove the 'guid' column and add the 'id' column await page.toggleSelectCol('guid'); - expect(await page.getSelectedColumnNames()).toEqual(newColNamesColumnConfig, 'for guid added to selected column names'); - await page.saveConfigureColumns(); + await page.toggleSelectCol('id'); + let expectedColumns = [ 'score', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country', + 'ip_dst_addr', 'host', 'alert_status', 'id' ]; + expect(await page.getSelectedColumnNames()).toEqual(expectedColumns, 'expect "id" field added and "guid" field removed from visible columns'); + await page.saveConfigureColumns(); }); it('should have all time-range controls', async function() : Promise<any> { diff --git a/metron-interface/metron-alerts/e2e/alerts-list/configure-table/configure-table.e2e-spec.ts b/metron-interface/metron-alerts/e2e/alerts-list/configure-table/configure-table.e2e-spec.ts index c3636f5..39504a9 100644 --- a/metron-interface/metron-alerts/e2e/alerts-list/configure-table/configure-table.e2e-spec.ts +++ b/metron-interface/metron-alerts/e2e/alerts-list/configure-table/configure-table.e2e-spec.ts @@ -24,7 +24,7 @@ import {loadTestData, deleteTestData} from '../../utils/e2e_util'; describe('Test spec for table column configuration', function() { let page: MetronAlertsPage; let loginPage: LoginPage; - let colNamesColumnConfig = [ 'score', 'id', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country', + let colNamesColumnConfig = [ 'score', 'guid', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country', 'ip_dst_addr', 'host', 'alert_status' ]; beforeAll(async function() : Promise<any> { @@ -45,17 +45,18 @@ describe('Test spec for table column configuration', function() { }); it('should select columns from table configuration', async function() : Promise<any> { - let newColNamesColumnConfig = [ 'score', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country', - 'ip_dst_addr', 'host', 'alert_status', 'guid' ]; - await page.clearLocalStorage(); await page.navigateTo(); + await page.clickConfigureTable(); + expect(await page.getSelectedColumnNames()).toEqualBcoz(colNamesColumnConfig, 'for default selected column names'); - await page.clickConfigureTable(); - expect(await page.getSelectedColumnNames()).toEqualBcoz(colNamesColumnConfig, 'for default selected column names'); - await page.toggleSelectCol('id'); + // remove the 'guid' column and add the 'id' column await page.toggleSelectCol('guid'); - expect(await page.getSelectedColumnNames()).toEqualBcoz(newColNamesColumnConfig, 'for guid added to selected column names'); + await page.toggleSelectCol('id'); + + let expectedColumns = [ 'score', 'timestamp', 'source:type', 'ip_src_addr', 'enrichments:geo:ip_dst_addr:country', + 'ip_dst_addr', 'host', 'alert_status', 'id' ]; + expect(await page.getSelectedColumnNames()).toEqualBcoz(expectedColumns, 'expect "id" field added and "guid" field removed from visible columns'); await page.saveConfigureColumns(); }); diff --git a/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.html b/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.html index 027f57a..718a41f 100644 --- a/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.html +++ b/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.html @@ -61,7 +61,7 @@ <span appAlertSeverity [severity]="getScore(alert.source)"> <a> {{ hasScore(alert.source) ? getScore(alert.source) : '-' }} </a> </span> </td> <td [attr.colspan]="alertsColumnsToDisplay.length - 1"> - <a (click)="addFilter('guid', alert.id)" [attr.title]="alert.id" style="color:#689AA9"> {{ alert.source['name'] ? alert.source['name'] : alert.id | centerEllipses:20:cell }}</a> + <a (click)="addFilter('guid', alert.source['guid'])" [attr.title]="alert.source['guid']" style="color:#689AA9"> {{ alert.source['name'] ? alert.source['name'] : alert.source['guid'] | centerEllipses:20:cell }}</a> <span> ({{ alert.source.metron_alert.length }})</span> </td> <td> diff --git a/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.ts b/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.ts index fd47b67..2190beb 100644 --- a/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.ts +++ b/metron-interface/metron-alerts/src/app/alerts/alerts-list/table-view/table-view.component.ts @@ -141,14 +141,14 @@ export class TableViewComponent implements OnInit, OnChanges, OnDestroy { onSort(sortEvent: SortEvent) { let sortOrder = (sortEvent.sortOrder === Sort.ASC ? 'asc' : 'desc'); - let sortBy = sortEvent.sortBy === 'id' ? 'guid' : sortEvent.sortBy; + let sortBy = sortEvent.sortBy === 'id' ? '_uid' : sortEvent.sortBy; this.queryBuilder.setSort(sortBy, sortOrder); this.onRefreshData.emit(true); } getValue(alert: Alert, column: ColumnMetadata, formatData: boolean) { if (column.name === 'id') { - return this.formatValue(column, alert[column.name]); + return this.formatValue(column, alert['id']); } return this.getValueFromSource(alert.source, column, formatData); @@ -158,9 +158,6 @@ export class TableViewComponent implements OnInit, OnChanges, OnDestroy { let returnValue = ''; try { switch (column.name) { - case 'id': - returnValue = alertSource['guid']; - break; case 'alert_status': returnValue = alertSource['alert_status'] ? alertSource['alert_status'] : 'NEW'; break; @@ -218,7 +215,7 @@ export class TableViewComponent implements OnInit, OnChanges, OnDestroy { } addFilter(field: string, value: string) { - field = (field === 'id') ? 'guid' : field; + field = (field === 'id') ? '_id' : field; this.onAddFilter.emit(new Filter(field, value)); } diff --git a/metron-interface/metron-alerts/src/app/alerts/alerts-list/tree-view/tree-view.component.ts b/metron-interface/metron-alerts/src/app/alerts/alerts-list/tree-view/tree-view.component.ts index ab1d4eb..19aaa6e 100644 --- a/metron-interface/metron-alerts/src/app/alerts/alerts-list/tree-view/tree-view.component.ts +++ b/metron-interface/metron-alerts/src/app/alerts/alerts-list/tree-view/tree-view.component.ts @@ -323,7 +323,7 @@ export class TreeViewComponent extends TableViewComponent implements OnInit, OnC } sortTreeSubGroup($event, treeGroup: TreeGroupData) { - let sortBy = $event.sortBy === 'id' ? 'guid' : $event.sortBy; + let sortBy = $event.sortBy === 'id' ? '_uid' : $event.sortBy; let sortOrder = $event.sortOrder === Sort.ASC ? 'asc' : 'desc'; let sortField = new SortField(sortBy, sortOrder); diff --git a/metron-interface/metron-alerts/src/app/service/elasticsearch-localstorage-impl.ts b/metron-interface/metron-alerts/src/app/service/elasticsearch-localstorage-impl.ts index 6fd4107..2c91d4c 100644 --- a/metron-interface/metron-alerts/src/app/service/elasticsearch-localstorage-impl.ts +++ b/metron-interface/metron-alerts/src/app/service/elasticsearch-localstorage-impl.ts @@ -45,7 +45,7 @@ export class ElasticSearchLocalstorageImpl extends DataSource { sourceType: 'source:type'; private defaultColumnMetadata = [ - new ColumnMetadata('id', 'string'), + new ColumnMetadata('guid', 'string'), new ColumnMetadata('timestamp', 'date'), new ColumnMetadata('source:type', 'string'), new ColumnMetadata('ip_src_addr', 'ip'), diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java index 9e6e568..7aea2fc 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java @@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; /** @@ -120,10 +119,23 @@ public class ElasticsearchBulkDocumentWriter<D extends Document> implements Bulk if(document.getTimestamp() == null) { throw new IllegalArgumentException("Document must contain the timestamp"); } + + // if updating an existing document, the doc ID should be defined. + // if creating a new document, set the doc ID to null to allow Elasticsearch to generate one. + String docId = document.getDocumentID().orElse(null); + if(LOG.isDebugEnabled() && document.getDocumentID().isPresent()) { + LOG.debug("Updating existing document with known doc ID; docID={}, guid={}, sensorType={}", + docId, document.getGuid(), document.getSensorType()); + } else if(LOG.isDebugEnabled()) { + LOG.debug("Creating a new document, doc ID not yet known; guid={}, sensorType={}", + document.getGuid(), document.getSensorType()); + } + return new IndexRequest() .source(document.getDocument()) .type(document.getSensorType() + "_doc") - .id(document.getGuid()) + .index(index) + .id(docId) .index(index) .timestamp(document.getTimestamp().toString()); } @@ -149,6 +161,7 @@ public class ElasticsearchBulkDocumentWriter<D extends Document> implements Bulk } else { // request succeeded D success = getDocument(response.getItemId()); + success.setDocumentID(response.getResponse().getId()); results.addSuccess(success); } } diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java index 2e9c855..519e803 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java @@ -44,11 +44,13 @@ import org.apache.metron.indexing.dao.metaalert.lucene.AbstractLuceneMetaAlertUp import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.InvalidCreateException; import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest; import org.apache.metron.indexing.dao.update.Document; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.InnerHitBuilder; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.SearchHit; public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpdateDao { @@ -148,8 +150,11 @@ public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpda try { // We need to update an alert itself. Only that portion of the update can be delegated. // We still need to get meta alerts potentially associated with it and update. - Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults().stream() - .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), MetaAlertConstants.METAALERT_TYPE, update.getTimestamp())) + SearchResponse response = getMetaAlertsForAlert(update.getGuid()); + Collection<Document> metaAlerts = response + .getResults() + .stream() + .map(result -> toDocument(result, update.getTimestamp())) .collect(Collectors.toList()); // Each meta alert needs to be updated with the new alert for (Document metaAlert : metaAlerts) { @@ -172,6 +177,13 @@ public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpda } } + private Document toDocument(SearchResult result, Long timestamp) { + Document document = Document.fromJSON(result.getSource()); + document.setTimestamp(timestamp); + document.setDocumentID(result.getId()); + return document; + } + @Override public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException { return getUpdateDao().addCommentToAlert(request); diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java index c63532e..dca74bc 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java @@ -60,7 +60,8 @@ public class ElasticsearchRequestSubmitter { org.elasticsearch.action.search.SearchResponse esResponse; try { esResponse = client.getHighLevelClient().search(request); - LOG.debug("Got Elasticsearch response; response={}", esResponse.toString()); + LOG.debug("Got Elasticsearch response with {} hit(s); response={}", + esResponse.getHits().getTotalHits(), esResponse.toString()); } catch (Exception e) { String msg = String.format( diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java index 0c91007..95d27db 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java @@ -18,8 +18,18 @@ package org.apache.metron.elasticsearch.dao; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; +import org.apache.metron.common.Constants; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; +import org.apache.metron.indexing.dao.RetrieveLatestDao; +import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -29,29 +39,23 @@ import java.util.List; import java.util.Optional; import java.util.function.Function; -import org.apache.metron.elasticsearch.client.ElasticsearchClient; -import org.apache.metron.indexing.dao.RetrieveLatestDao; -import org.apache.metron.indexing.dao.search.GetRequest; -import org.apache.metron.indexing.dao.update.Document; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.index.query.IdsQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.builder.SearchSourceBuilder; +import static org.elasticsearch.index.query.QueryBuilders.boolQuery; +import static org.elasticsearch.index.query.QueryBuilders.termsQuery; +import static org.elasticsearch.index.query.QueryBuilders.typeQuery; public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao { - private ElasticsearchClient transportClient; + private ElasticsearchClient client; + private ElasticsearchRequestSubmitter submitter; - public ElasticsearchRetrieveLatestDao(ElasticsearchClient transportClient) { - this.transportClient = transportClient; + public ElasticsearchRetrieveLatestDao(ElasticsearchClient client) { + this.client = client; + this.submitter = new ElasticsearchRequestSubmitter(client); } @Override public Document getLatest(String guid, String sensorType) throws IOException { - Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit)); + Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(hit)); return doc.orElse(null); } @@ -63,21 +67,7 @@ public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao { guids.add(getRequest.getGuid()); sensorTypes.add(getRequest.getSensorType()); } - List<Document> documents = searchByGuids( - guids, - sensorTypes, - hit -> { - Long ts = 0L; - String doc = hit.getSourceAsString(); - String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null); - try { - return Optional.of(new Document(doc, hit.getId(), sourceType, ts)); - } catch (IOException e) { - throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); - } - } - - ); + List<Document> documents = searchByGuids(guids, sensorTypes, hit -> toDocument(hit)); return documents; } @@ -102,54 +92,47 @@ public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao { if (guids == null || guids.isEmpty()) { return Collections.emptyList(); } - QueryBuilder query = null; - IdsQueryBuilder idsQuery; - if (sensorTypes != null) { - String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc") - .toArray(String[]::new); - idsQuery = QueryBuilders.idsQuery(types); - } else { - idsQuery = QueryBuilders.idsQuery(); - } - for (String guid : guids) { - query = idsQuery.addIds(guid); + // should match any of the guids + // the 'guid' field must be of type 'keyword' or this term query will not match + BoolQueryBuilder guidQuery = boolQuery().must(termsQuery(Constants.GUID, guids)); + + // should match any of the sensor types + BoolQueryBuilder sensorQuery = boolQuery(); + sensorTypes.forEach(sensorType -> sensorQuery.should(typeQuery(sensorType + "_doc"))); + + // must have a match for both guid and sensor + BoolQueryBuilder query = boolQuery() + .must(guidQuery) + .must(sensorQuery); + + // submit the search + SearchResponse response; + try { + SearchSourceBuilder source = new SearchSourceBuilder() + .query(query) + .size(guids.size()); + SearchRequest request = new SearchRequest().source(source); + response = submitter.submitSearch(request); + + } catch(InvalidSearchException e) { + throw new IOException(e); } - SearchRequest request = new SearchRequest(); - SearchSourceBuilder builder = new SearchSourceBuilder(); - builder.query(query); - builder.size(guids.size()); - request.source(builder); - - org.elasticsearch.action.search.SearchResponse response = transportClient.getHighLevelClient().search(request); - SearchHits hits = response.getHits(); + + // transform the search hits to results using the callback List<T> results = new ArrayList<>(); - for (SearchHit hit : hits) { + for(SearchHit hit: response.getHits()) { Optional<T> result = callback.apply(hit); - if (result.isPresent()) { - results.add(result.get()); - } + result.ifPresent(r -> results.add(r)); } + return results; } - private Optional<Document> toDocument(final String guid, SearchHit hit) { - Long ts = 0L; - String doc = hit.getSourceAsString(); - String sourceType = toSourceType(hit.getType()); - try { - return Optional.of(new Document(doc, guid, sourceType, ts)); - } catch (IOException e) { - throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); - } - } + private Optional<Document> toDocument(SearchHit hit) { + Document document = Document.fromJSON(hit.getSource()); + document.setDocumentID(hit.getId()); - /** - * Returns the source type based on a given doc type. - * @param docType The document type. - * @return The source type. - */ - private String toSourceType(String docType) { - return Iterables.getFirst(Splitter.on("_doc").split(docType), null); + return Optional.of(document); } } diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java index b313811..c6389d7 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java @@ -20,6 +20,7 @@ package org.apache.metron.elasticsearch.bulk; import org.apache.metron.common.Constants; import org.apache.metron.elasticsearch.client.ElasticsearchClient; import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; @@ -130,15 +131,21 @@ public class ElasticsearchBulkDocumentWriterTest { } private void setupElasticsearchToFail() throws IOException { + final String errorMessage = "error message"; + final Exception cause = new Exception("test exception"); + final boolean isFailed = true; + final int itemID = 0; + // define the item failure BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class); - when(failure.getCause()).thenReturn(new Exception("test exception")); - when(failure.getMessage()).thenReturn("error message"); + when(failure.getCause()).thenReturn(cause); + when(failure.getMessage()).thenReturn(errorMessage); // define the item level response BulkItemResponse itemResponse = mock(BulkItemResponse.class); - when(itemResponse.isFailed()).thenReturn(true); - when(itemResponse.getItemId()).thenReturn(0); + when(itemResponse.isFailed()).thenReturn(isFailed); + when(itemResponse.getItemId()).thenReturn(itemID); + when(itemResponse.getFailure()).thenReturn(failure); when(itemResponse.getFailureMessage()).thenReturn("error message"); List<BulkItemResponse> itemsResponses = Collections.singletonList(itemResponse); @@ -146,16 +153,32 @@ public class ElasticsearchBulkDocumentWriterTest { // define the bulk response to indicate failure BulkResponse response = mock(BulkResponse.class); when(response.iterator()).thenReturn(itemsResponses.iterator()); - when(response.hasFailures()).thenReturn(true); + when(response.hasFailures()).thenReturn(isFailed); // have the client return the mock response when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response); } private void setupElasticsearchToSucceed() throws IOException { + final String documentId = UUID.randomUUID().toString(); + final boolean isFailed = false; + final int itemID = 0; + + // the write response will contain what is used as the document ID + DocWriteResponse writeResponse = mock(DocWriteResponse.class); + when(writeResponse.getId()).thenReturn(documentId); + + // define the item level response + BulkItemResponse itemResponse = mock(BulkItemResponse.class); + when(itemResponse.isFailed()).thenReturn(isFailed); + when(itemResponse.getItemId()).thenReturn(itemID); + when(itemResponse.getResponse()).thenReturn(writeResponse); + List<BulkItemResponse> itemsResponses = Collections.singletonList(itemResponse); + // define the bulk response to indicate success BulkResponse response = mock(BulkResponse.class); - when(response.hasFailures()).thenReturn(false); + when(response.iterator()).thenReturn(itemsResponses.iterator()); + when(response.hasFailures()).thenReturn(isFailed); // have the client return the mock response when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response); diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java index 7a84588..917df4d 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java @@ -27,6 +27,7 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.Index; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchShardTarget; import org.junit.Test; @@ -55,15 +56,19 @@ public class ElasticsearchRequestSubmitterTest { @Test public void searchShouldSucceedWhenOK() throws InvalidSearchException, IOException { - // mocks SearchResponse response = mock(SearchResponse.class); SearchRequest request = new SearchRequest(); + // response will indicate 1 search hit + SearchHits hits = mock(SearchHits.class); + when(hits.getTotalHits()).thenReturn(1L); + // response will have status of OK and no failed shards when(response.status()).thenReturn(RestStatus.OK); when(response.getFailedShards()).thenReturn(0); when(response.getTotalShards()).thenReturn(2); + when(response.getHits()).thenReturn(hits); // search should succeed ElasticsearchRequestSubmitter submitter = setup(response); @@ -99,9 +104,14 @@ public class ElasticsearchRequestSubmitterTest { // response will have status of OK when(response.status()).thenReturn(RestStatus.OK); + // response will indicate 1 search hit + SearchHits hits = mock(SearchHits.class); + when(hits.getTotalHits()).thenReturn(1L); + // the response will report shard failures when(response.getFailedShards()).thenReturn(1); when(response.getTotalShards()).thenReturn(2); + when(response.getHits()).thenReturn(hits); // the response will return the failures ShardSearchFailure[] failures = { fail }; diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchBulkDocumentWriterIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchBulkDocumentWriterIntegrationTest.java new file mode 100644 index 0000000..df4aeb0 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchBulkDocumentWriterIntegrationTest.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.integration; + +import org.apache.http.HttpEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.metron.common.Constants; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory; +import org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao; +import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.Response; +import org.hamcrest.CoreMatchers; +import org.json.simple.JSONObject; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public class ElasticsearchBulkDocumentWriterIntegrationTest { + + @ClassRule + public static TemporaryFolder indexDir = new TemporaryFolder(); + private static String broTemplatePath = "../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template"; + private static ElasticSearchComponent elasticsearch; + private ElasticsearchClient client; + private ElasticsearchBulkDocumentWriter<Document> writer; + private ElasticsearchRetrieveLatestDao retrieveDao; + + @BeforeClass + public static void setupElasticsearch() throws Exception { + AccessConfig accessConfig = new AccessConfig(); + accessConfig.setGlobalConfigSupplier(() -> globals()); + + elasticsearch = new ElasticSearchComponent.Builder() + .withHttpPort(9211) + .withIndexDir(indexDir.getRoot()) + .withAccessConfig(accessConfig) + .build(); + elasticsearch.start(); + } + + @AfterClass + public static void tearDownElasticsearch() { + if(elasticsearch != null) { + elasticsearch.stop(); + } + } + + @Before + public void setup() throws Exception { + client = ElasticsearchClientFactory.create(globals()); + retrieveDao = new ElasticsearchRetrieveLatestDao(client); + writer = new ElasticsearchBulkDocumentWriter<>(client) + .withRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + + // add bro template + JSONObject broTemplate = JSONUtils.INSTANCE.load(new File(broTemplatePath), JSONObject.class); + String broTemplateJson = JSONUtils.INSTANCE.toJSON(broTemplate, true); + HttpEntity broEntity = new NStringEntity(broTemplateJson, ContentType.APPLICATION_JSON); + Response response = client + .getLowLevelClient() + .performRequest("PUT", "/_template/bro_template", Collections.emptyMap(), broEntity); + assertThat(response.getStatusLine().getStatusCode(), CoreMatchers.equalTo(200)); + } + + @After + public void tearDown() throws IOException { + if(client != null) { + client.close(); + } + } + + @Test + public void testWrite() throws Exception { + // create some documents to write + List<Document> documents = new ArrayList<>(); + for(int i=0; i<10; i++) { + Document document = Document.fromJSON(createMessage()); + documents.add(document); + } + + // write the documents + for(Document doc: documents) { + writer.addDocument(doc, "bro_index"); + } + writer.write(); + + // ensure the documents were written + for(Document expected: documents) { + Document actual = retrieveDao.getLatest(expected.getGuid(), expected.getSensorType()); + assertNotNull("No document found", actual); + assertEquals(expected.getGuid(), actual.getGuid()); + assertEquals(expected.getSensorType(), actual.getSensorType()); + assertEquals(expected.getDocument(), actual.getDocument()); + assertTrue(actual.getDocumentID().isPresent()); + + // the document ID and GUID should not be the same, since the document ID was auto-generated + assertNotEquals(actual.getDocument(), actual.getGuid()); + } + } + + private static Map<String, Object> globals() { + Map<String, Object> globals = new HashMap<>(); + globals.put("es.clustername", "metron"); + globals.put("es.ip", "localhost"); + globals.put("es.port", "9200"); + globals.put("es.date.format", "yyyy.MM.dd.HH"); + return globals; + } + + private JSONObject createMessage() { + JSONObject message = new JSONObject(); + message.put(Constants.GUID, UUID.randomUUID().toString()); + message.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis()); + message.put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1"); + message.put("source:type", "bro"); + return message; + } +} diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java index c3e2108..09f7df9 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java @@ -20,14 +20,7 @@ package org.apache.metron.indexing.dao; import com.google.common.base.Joiner; import com.google.common.collect.Iterables; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; +import org.apache.commons.lang.ClassUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.indexing.dao.search.GetRequest; @@ -38,11 +31,25 @@ import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest; import org.apache.metron.indexing.dao.update.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; public class MultiIndexDao implements IndexDao { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private List<IndexDao> indices; - public MultiIndexDao( IndexDao... composedDao) { + public MultiIndexDao(IndexDao... composedDao) { indices = new ArrayList<>(); Collections.addAll(indices, composedDao); } @@ -117,18 +124,30 @@ public class MultiIndexDao implements IndexDao { */ @Override public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest) throws IOException { - List<DocumentContainer> output = - indices.parallelStream().map(dao -> { - try { - return new DocumentContainer(dao.addCommentToAlert(request, latest)); - } catch (Throwable e) { - return new DocumentContainer(e); - } - }).collect(Collectors.toList()); - + List<DocumentContainer> output = indices + .parallelStream() + .map(dao -> addCommentToAlert(dao, request, latest)) + .collect(Collectors.toList()); return getLatestDocument(output); } + private DocumentContainer addCommentToAlert(IndexDao indexDao, CommentAddRemoveRequest request, Document latest) { + DocumentContainer container; + try { + Document document = indexDao.addCommentToAlert(request, latest); + container = new DocumentContainer(document); + LOG.debug("Added comment to alert; indexDao={}, guid={}, sensorType={}, document={}", + ClassUtils.getShortClassName(indexDao.getClass()), document.getGuid(), document.getSensorType(), document); + + } catch (Throwable e) { + container = new DocumentContainer(e); + LOG.error("Unable to add comment to alert; indexDao={}, error={}", + ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e)); + } + + return container; + } + @Override public Document removeCommentFromAlert(CommentAddRemoveRequest request) throws IOException { Document latest = getLatest(request.getGuid(), request.getSensorType()); @@ -145,18 +164,30 @@ public class MultiIndexDao implements IndexDao { */ @Override public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest) throws IOException { - List<DocumentContainer> output = - indices.parallelStream().map(dao -> { - try { - return new DocumentContainer(dao.removeCommentFromAlert(request, latest)); - } catch (Throwable e) { - return new DocumentContainer(e); - } - }).collect(Collectors.toList()); - + List<DocumentContainer> output = indices + .parallelStream() + .map(dao -> removeCommentFromAlert(dao, request, latest)) + .collect(Collectors.toList()); return getLatestDocument(output); } + private DocumentContainer removeCommentFromAlert(IndexDao indexDao, CommentAddRemoveRequest request, Document latest) { + DocumentContainer container; + try { + Document document = indexDao.removeCommentFromAlert(request, latest); + container = new DocumentContainer(document); + LOG.debug("Removed comment from alert; indexDao={}, guid={}, sensorType={}, document={}", + ClassUtils.getShortClassName(indexDao.getClass()), document.getGuid(), document.getSensorType(), document); + + } catch (Throwable e) { + container = new DocumentContainer(e); + LOG.error("Unable to remove comment from alert; indexDao={}, error={}", + ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e)); + } + + return container; + } + protected static class DocumentContainer { private Optional<Document> d = Optional.empty(); private Optional<Throwable> t = Optional.empty(); @@ -226,18 +257,30 @@ public class MultiIndexDao implements IndexDao { @Override public Document getLatest(final String guid, String sensorType) throws IOException { - List<DocumentContainer> output = - indices.parallelStream().map(dao -> { - try { - return new DocumentContainer(dao.getLatest(guid, sensorType)); - } catch (Throwable e) { - return new DocumentContainer(e); - } - }).collect(Collectors.toList()); - + List<DocumentContainer> output = indices + .parallelStream() + .map(dao -> getLatest(dao, guid, sensorType)) + .collect(Collectors.toList()); return getLatestDocument(output); } + private DocumentContainer getLatest(IndexDao indexDao, String guid, String sensorType) { + DocumentContainer container; + try { + Document document = indexDao.getLatest(guid, sensorType); + container = new DocumentContainer(document); + LOG.debug("Found latest document; indexDao={}, guid={}, sensorType={}, document={}", + ClassUtils.getShortClassName(indexDao.getClass()), guid, sensorType, document); + + } catch (Throwable e) { + container = new DocumentContainer(e); + LOG.error("Unable to find latest document; indexDao={}, error={}", + ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e)); + } + + return container; + } + @Override public Iterable<Document> getAllLatest( List<GetRequest> getRequests) throws IOException { diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertConstants.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertConstants.java index daa5424..b473200 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertConstants.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertConstants.java @@ -20,10 +20,24 @@ package org.apache.metron.indexing.dao.metaalert; public class MetaAlertConstants { public static String METAALERT_TYPE = "metaalert"; + + /** + * The name of the field in an alert that contains a list + * of GUIDs of all meta-alerts the alert is associated with. + * + * <p>Only standard, non-metaalerts will have this field. + */ public static String METAALERT_FIELD = "metaalerts"; public static String METAALERT_DOC = METAALERT_TYPE + "_doc"; public static String THREAT_FIELD_DEFAULT = "threat:triage:score"; public static String THREAT_SORT_DEFAULT = "sum"; + + /** + * The name of the field in a meta-alert that contains a list of + * all alerts associated with the meta-alert. + * + * <p>Only meta-alerts will have this field. + */ public static String ALERT_FIELD = "metron_alert"; public static String STATUS_FIELD = "status"; public static String GROUPS_FIELD = "groups"; diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java index 3686b19..0a028e5 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java @@ -21,19 +21,39 @@ package org.apache.metron.indexing.dao.update; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.metron.common.Constants.Fields.TIMESTAMP; +import static org.apache.metron.common.Constants.GUID; +import static org.apache.metron.common.Constants.SENSOR_TYPE; import org.apache.metron.common.utils.JSONUtils; public class Document { + Long timestamp; Map<String, Object> document; String guid; String sensorType; + String documentID; + + public static Document fromJSON(Map<String, Object> json) { + String guid = getGUID(json); + Long timestamp = getTimestamp(json).orElse(0L); + String sensorType = getSensorType(json); + return new Document(json, guid, sensorType, timestamp); + } public Document(Map<String, Object> document, String guid, String sensorType, Long timestamp) { + this(document, guid, sensorType, timestamp, null); + } + + public Document(Map<String, Object> document, String guid, String sensorType, Long timestamp, String documentID) { setDocument(document); setGuid(guid); setTimestamp(timestamp); setSensorType(sensorType); + setDocumentID(documentID); } public Document(String document, String guid, String sensorType, Long timestamp) throws IOException { @@ -41,7 +61,7 @@ public class Document { } public Document(String document, String guid, String sensorType) throws IOException { - this( document, guid, sensorType, null); + this(document, guid, sensorType, null); } /** @@ -49,8 +69,11 @@ public class Document { * @param other The document to be copied. */ public Document(Document other) { - this(new HashMap<>(other.getDocument()), other.getGuid(), other.getSensorType(), - other.getTimestamp()); + this(new HashMap<>(other.getDocument()), + other.getGuid(), + other.getSensorType(), + other.getTimestamp(), + other.getDocumentID().orElse(null)); } private static Map<String, Object> convertDoc(String document) throws IOException { @@ -89,46 +112,83 @@ public class Document { this.guid = guid; } - @Override - public String toString() { - return "Document{" + - "timestamp=" + timestamp + - ", document=" + document + - ", guid='" + guid + '\'' + - ", sensorType='" + sensorType + '\'' + - '}'; + /** + * Returns the unique identifier that is used when persisting this document. + * + * <p>This value will be different than the Metron guid. + * + * <p>Only present when a document has been retrieved from a store + * that supports a document ID, like Elasticsearch. This value will + * not be present when retrieved from HBase. + */ + public Optional<String> getDocumentID() { + return Optional.ofNullable(documentID); } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; + public void setDocumentID(Optional<String> documentID) { + this.documentID = documentID.orElse(null); + } + + public void setDocumentID(String documentID) { + this.documentID = documentID; + } + + private static Optional<Long> getTimestamp(Map<String, Object> document) { + Object value = document.get(TIMESTAMP.getName()); + if(value != null && value instanceof Long) { + return Optional.of(Long.class.cast(value)); } - if (o == null || getClass() != o.getClass()) { - return false; + return Optional.empty(); + } + + private static String getGUID(Map<String, Object> document) { + Object value = document.get(GUID); + if(value != null && value instanceof String) { + return String.class.cast(value); } - Document document1 = (Document) o; + throw new IllegalStateException(String.format("Missing '%s' field", GUID)); + } - if (timestamp != null ? !timestamp.equals(document1.timestamp) : document1.timestamp != null) { - return false; - } - if (document != null ? !document.equals(document1.document) : document1.document != null) { - return false; + private static String getSensorType(Map<String, Object> document) { + Object value = document.get(SENSOR_TYPE); + if(value != null && value instanceof String) { + return String.class.cast(value); } - if (guid != null ? !guid.equals(document1.guid) : document1.guid != null) { - return false; + + value = document.get(SENSOR_TYPE.replace(".", ":")); + if(value != null && value instanceof String) { + return String.class.cast(value); } - return sensorType != null ? sensorType.equals(document1.sensorType) - : document1.sensorType == null; + + throw new IllegalStateException(String.format("Missing '%s' field", SENSOR_TYPE)); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Document)) return false; + Document document1 = (Document) o; + return Objects.equals(timestamp, document1.timestamp) && + Objects.equals(document, document1.document) && + Objects.equals(guid, document1.guid) && + Objects.equals(sensorType, document1.sensorType) && + Objects.equals(documentID, document1.documentID); } @Override public int hashCode() { - int result = timestamp != null ? timestamp.hashCode() : 0; - result = 31 * result + (document != null ? document.hashCode() : 0); - result = 31 * result + (guid != null ? guid.hashCode() : 0); - result = 31 * result + (sensorType != null ? sensorType.hashCode() : 0); - return result; + return Objects.hash(timestamp, document, guid, sensorType, documentID); + } + + @Override + public String toString() { + return "Document{" + + "timestamp=" + timestamp + + ", document=" + document + + ", guid='" + guid + '\'' + + ", sensorType='" + sensorType + '\'' + + ", documentID=" + documentID + + '}'; } } diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java index 82f0a49..ef1d298 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.util.Map; import java.util.Optional; +import static java.lang.String.format; + public interface UpdateDao { /** @@ -55,7 +57,6 @@ public interface UpdateDao { Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest) throws IOException; - /** * Update a document in an index given a JSON Patch (see RFC 6902 at * https://tools.ietf.org/html/rfc6902) @@ -73,23 +74,28 @@ public interface UpdateDao { } default Document getPatchedDocument(RetrieveLatestDao retrieveLatestDao, PatchRequest request, - Optional<Long> timestamp + Optional<Long> optionalTimestamp ) throws OriginalNotFoundException, IOException { - Map<String, Object> latest = request.getSource(); - if (latest == null) { - Document latestDoc = retrieveLatestDao.getLatest(request.getGuid(), request.getSensorType()); - if (latestDoc != null && latestDoc.getDocument() != null) { - latest = latestDoc.getDocument(); + String guid = request.getGuid(); + String sensorType = request.getSensorType(); + String documentID = null; + Long timestamp = optionalTimestamp.orElse(System.currentTimeMillis()); + + Map<String, Object> originalSource = request.getSource(); + if (originalSource == null) { + // no document source provided, lookup the latest + Document toPatch = retrieveLatestDao.getLatest(guid, sensorType); + if(toPatch != null && toPatch.getDocument() != null) { + originalSource = toPatch.getDocument(); + documentID = toPatch.getDocumentID().orElse(null); + } else { - throw new OriginalNotFoundException( - "Unable to patch an document that doesn't exist and isn't specified."); + String error = format("Document does not exist, but is required; guid=%s, sensorType=%s", guid, sensorType); + throw new OriginalNotFoundException(error); } } - Map<String, Object> updated = JSONUtils.INSTANCE.applyPatch(request.getPatch(), latest); - return new Document(updated, - request.getGuid(), - request.getSensorType(), - timestamp.orElse(System.currentTimeMillis())); + Map<String, Object> patchedSource = JSONUtils.INSTANCE.applyPatch(request.getPatch(), originalSource); + return new Document(patchedSource, guid, sensorType, timestamp, documentID); } } diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java index f1355a6..5a18fc5 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java @@ -55,6 +55,7 @@ import org.apache.metron.indexing.dao.search.SortOrder; import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.indexing.dao.update.OriginalNotFoundException; import org.apache.metron.indexing.dao.update.PatchRequest; +import org.apache.metron.integration.utils.TestUtils; import org.json.simple.parser.ParseException; import org.junit.Assert; import org.junit.Test; @@ -231,8 +232,8 @@ public abstract class MetaAlertIntegrationTest { SearchResponse result = metaDao.search(sr); List<SearchResult> results = result.getResults(); Assert.assertEquals(2, results.size()); - Assert.assertEquals("meta_active_0", results.get((0)).getId()); - Assert.assertEquals("message_1", results.get((1)).getId()); + Assert.assertEquals("meta_active_0", results.get((0)).getSource().get(Constants.GUID)); + Assert.assertEquals("message_1", results.get((1)).getSource().get(Constants.GUID)); // Test ascending SortField sfAsc = new SortField(); @@ -245,8 +246,8 @@ public abstract class MetaAlertIntegrationTest { srAsc.setSort(Collections.singletonList(sfAsc)); result = metaDao.search(srAsc); results = result.getResults(); - Assert.assertEquals("message_1", results.get((0)).getId()); - Assert.assertEquals("meta_active_0", results.get((1)).getId()); + Assert.assertEquals("message_1", results.get((0)).getSource().get(Constants.GUID)); + Assert.assertEquals("meta_active_0", results.get((1)).getSource().get(Constants.GUID)); Assert.assertEquals(2, results.size()); } @@ -856,92 +857,72 @@ public abstract class MetaAlertIntegrationTest { @Test public abstract void shouldSearchByNestedAlert() throws Exception; - @SuppressWarnings("unchecked") + /** + * If a meta-alert is active, any updates to alerts associated with a meta-alert + * should be reflected in both the original alert and the copy contained within + * the meta-alert. + */ @Test public void shouldUpdateMetaAlertOnAlertUpdate() throws Exception { - // Load alerts - List<Map<String, Object>> alerts = buildAlerts(2); - alerts.get(0).put(METAALERT_FIELD, Arrays.asList("meta_active", "meta_inactive")); - addRecords(alerts, getTestIndexFullName(), SENSOR_NAME); - - // Load metaAlerts - Map<String, Object> activeMetaAlert = buildMetaAlert("meta_active", MetaAlertStatus.ACTIVE, - Optional.of(Collections.singletonList(alerts.get(0)))); - Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive", - MetaAlertStatus.INACTIVE, - Optional.of(Collections.singletonList(alerts.get(0)))); - // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically. - addRecords(Arrays.asList(activeMetaAlert, inactiveMetaAlert), getMetaAlertIndex(), - METAALERT_TYPE); - - // Verify load was successful - findCreatedDocs(Arrays.asList( - new GetRequest("message_0", SENSOR_NAME), - new GetRequest("message_1", SENSOR_NAME), - new GetRequest("meta_active", METAALERT_TYPE), - new GetRequest("meta_inactive", METAALERT_TYPE))); - + final String expectedFieldValue = "metron"; { - // Modify the first message and add a new field - Map<String, Object> message0 = new HashMap<String, Object>(alerts.get(0)) { - { - put(NEW_FIELD, "metron"); - put(THREAT_FIELD_DEFAULT, 10.0d); - } - }; - String guid = "" + message0.get(Constants.GUID); - metaDao.update(new Document(message0, guid, SENSOR_NAME, null), - Optional.of(getTestIndexFullName())); - - { - // Verify alerts are up-to-date - findUpdatedDoc(message0, guid, SENSOR_NAME); - long cnt = getMatchingAlertCount(NEW_FIELD, message0.get(NEW_FIELD)); - if (cnt == 0) { - Assert.fail("Alert not updated!"); - } - } - - { - // Verify meta alerts are up-to-date - long cnt = getMatchingMetaAlertCount(NEW_FIELD, "metron"); - if (cnt == 0) { - Assert.fail("Active metaalert was not updated!"); - } - if (cnt != 1) { - Assert.fail("Metaalerts not updated correctly!"); - } - } + // create 2 'regular' alerts that will be associated with meta-alerts + List<Map<String, Object>> alerts = buildAlerts(2); + alerts.get(0).put(METAALERT_FIELD, Arrays.asList("meta_active", "meta_inactive")); + addRecords(alerts, getTestIndexFullName(), SENSOR_NAME); + + // the active meta-alert should be updated when an associated alert is updated + Map<String, Object> activeMetaAlert = buildMetaAlert("meta_active", MetaAlertStatus.ACTIVE, + Optional.of(Collections.singletonList(alerts.get(0)))); + + // the inactive meta-alert should NOT be updated when an associated alert is updated + Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE, + Optional.of(Collections.singletonList(alerts.get(0)))); + + // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically. + addRecords(Arrays.asList(activeMetaAlert, inactiveMetaAlert), getMetaAlertIndex(), METAALERT_TYPE); + + // Verify load was successful + findCreatedDocs(Arrays.asList( + new GetRequest("message_0", SENSOR_NAME), + new GetRequest("message_1", SENSOR_NAME), + new GetRequest("meta_active", METAALERT_TYPE), + new GetRequest("meta_inactive", METAALERT_TYPE))); } - //modify the same message and modify the new field { - Map<String, Object> message0 = new HashMap<String, Object>(alerts.get(0)) { - { - put(NEW_FIELD, "metron2"); - } - }; - String guid = "" + message0.get(Constants.GUID); - metaDao.update(new Document(message0, guid, SENSOR_NAME, null), Optional.empty()); - - { - // Verify index is up-to-date - findUpdatedDoc(message0, guid, SENSOR_NAME); - long cnt = getMatchingAlertCount(NEW_FIELD, message0.get(NEW_FIELD)); - if (cnt == 0) { - Assert.fail("Alert not updated!"); - } - } - { - // Verify meta alerts are up-to-date - long cnt = getMatchingMetaAlertCount(NEW_FIELD, "metron2"); - if (cnt == 0) { - Assert.fail("Active metaalert was not updated!"); - } - if (cnt != 1) { - Assert.fail("Metaalerts not updated correctly!"); - } - } + // modify the 'normal' alert by adding a field + Document message0 = metaDao.getLatest("message_0", SENSOR_NAME); + message0.getDocument().put(NEW_FIELD, expectedFieldValue); + message0.getDocument().put(THREAT_FIELD_DEFAULT, 10.0d); + metaDao.update(message0, Optional.of(getTestIndexFullName())); } + + // ensure the original 'normal' alert was itself updated + assertEventually(() -> { + Document message0 = metaDao.getLatest("message_0", SENSOR_NAME); + Assert.assertNotNull(message0); + Assert.assertEquals(expectedFieldValue, message0.getDocument().get(NEW_FIELD)); + }); + + // the 'active' meta-alert, which contains a copy of the updated alert should also be updated + assertEventually(() -> { + Document active = metaDao.getLatest("meta_active", METAALERT_TYPE); + Object value = active.getDocument().get(ALERT_FIELD); + List<Map<String, Object>> children = List.class.cast(value); + Assert.assertNotNull(children); + Assert.assertEquals(1, children.size()); + Assert.assertEquals(expectedFieldValue, children.get(0).get(NEW_FIELD)); + }); + + // the 'inactive' meta-alert, which contains a copy of the updated alert should NOT be updated + assertEventually(() -> { + Document inactive = metaDao.getLatest("meta_inactive", METAALERT_TYPE); + Object value = inactive.getDocument().get(ALERT_FIELD); + List<Map<String, Object>> children = List.class.cast(value); + Assert.assertNotNull(children); + Assert.assertEquals(1, children.size()); + Assert.assertFalse(children.get(0).containsKey(NEW_FIELD)); + }); } @Test @@ -957,7 +938,7 @@ public abstract class MetaAlertIntegrationTest { } @Test - public void shouldPatchAllowedMetaAlerts() throws Exception { + public void shouldPatchMetaAlertFields() throws Exception { // Load alerts List<Map<String, Object>> alerts = buildAlerts(2); alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active")); @@ -969,63 +950,108 @@ public abstract class MetaAlertIntegrationTest { // Load metaAlerts Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE, - Optional.of(Arrays.asList(alerts.get(0), alerts.get(1)))); + Optional.of(Arrays.asList(alerts.get(0), alerts.get(1)))); // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically. addRecords(Collections.singletonList(metaAlert), getMetaAlertIndex(), METAALERT_TYPE); - // Verify load was successful + // ensure the test data was loaded findCreatedDocs(Arrays.asList( - new GetRequest("message_0", SENSOR_NAME), - new GetRequest("message_1", SENSOR_NAME), - new GetRequest("meta_alert", METAALERT_TYPE))); + new GetRequest("message_0", SENSOR_NAME), + new GetRequest("message_1", SENSOR_NAME), + new GetRequest("meta_alert", METAALERT_TYPE))); - Map<String, Object> expectedMetaAlert = new HashMap<>(metaAlert); - expectedMetaAlert.put(NAME_FIELD, "New Meta Alert"); - { - // Verify a patch to a field other than "status" or "alert" can be patched - String namePatch = namePatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex()); - PatchRequest patchRequest = JSONUtils.INSTANCE.load(namePatch, PatchRequest.class); + // patch the name field + String namePatch = namePatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex()); + PatchRequest patchRequest = JSONUtils.INSTANCE.load(namePatch, PatchRequest.class); + metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis())); + + // ensure the alert was patched + assertEventually(() -> { + Document updated = metaDao.getLatest("meta_alert", METAALERT_TYPE); + Assert.assertEquals("New Meta Alert", updated.getDocument().get(NAME_FIELD)); + }); + } + + @Test + public void shouldThrowExceptionIfPatchAlertField() throws Exception { + setupTypings(); + + // add 2 alerts to an active meta-alert + List<Map<String, Object>> alerts = buildAlerts(2); + alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active")); + alerts.get(1).put(METAALERT_FIELD, Collections.singletonList("meta_active")); + addRecords(alerts, getTestIndexFullName(), SENSOR_NAME); + + // create an active meta-alert + Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE, + Optional.of(Arrays.asList(alerts.get(0), alerts.get(1)))); + addRecords(Collections.singletonList(metaAlert), getMetaAlertIndex(), METAALERT_TYPE); + + // ensure the test data was loaded + findCreatedDocs(Arrays.asList( + new GetRequest("message_0", SENSOR_NAME), + new GetRequest("message_1", SENSOR_NAME), + new GetRequest("meta_alert", METAALERT_TYPE))); + + // attempt to patch the alert field + try { + String alertPatch = alertPatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex()); + PatchRequest patchRequest = JSONUtils.INSTANCE.load(alertPatch, PatchRequest.class); metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis())); + Assert.fail("A patch on the alert field should throw an exception"); - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); + } catch (IllegalArgumentException iae) { + Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths. " + + "Please use the add/remove alert or update status functions instead.", + iae.getMessage()); } - { - // Verify a patch to an alert field should throw an exception - try { - String alertPatch = alertPatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex()); - PatchRequest patchRequest = JSONUtils.INSTANCE.load(alertPatch, PatchRequest.class); - metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis())); - - Assert.fail("A patch on the alert field should throw an exception"); - } catch (IllegalArgumentException iae) { - Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths. " - + "Please use the add/remove alert or update status functions instead.", - iae.getMessage()); - } + // ensure the alert field was NOT changed + assertEventually(() -> { + Document updated = metaDao.getLatest("meta_alert", METAALERT_TYPE); + Assert.assertEquals(metaAlert.get(ALERT_FIELD), updated.getDocument().get(ALERT_FIELD)); + }); + } - // Verify the metaAlert was not updated - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - } + @Test + public void shouldThrowExceptionIfPatchStatusField() throws Exception { + setupTypings(); - { - // Verify a patch to a status field should throw an exception - try { - String statusPatch = statusPatchRequest - .replace(META_INDEX_FLAG, getMetaAlertIndex()); - PatchRequest patchRequest = JSONUtils.INSTANCE.load(statusPatch, PatchRequest.class); - metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis())); - - Assert.fail("A patch on the status field should throw an exception"); - } catch (IllegalArgumentException iae) { - Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths. " - + "Please use the add/remove alert or update status functions instead.", - iae.getMessage()); - } + // add 2 alerts to an active meta-alert + List<Map<String, Object>> alerts = buildAlerts(2); + alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active")); + alerts.get(1).put(METAALERT_FIELD, Collections.singletonList("meta_active")); + addRecords(alerts, getTestIndexFullName(), SENSOR_NAME); - // Verify the metaAlert was not updated - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); + // create an active meta-alert + Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE, + Optional.of(Arrays.asList(alerts.get(0), alerts.get(1)))); + addRecords(Collections.singletonList(metaAlert), getMetaAlertIndex(), METAALERT_TYPE); + + // ensure the test data was loaded + findCreatedDocs(Arrays.asList( + new GetRequest("message_0", SENSOR_NAME), + new GetRequest("message_1", SENSOR_NAME), + new GetRequest("meta_alert", METAALERT_TYPE))); + + // Verify a patch to a status field should throw an exception + try { + String statusPatch = statusPatchRequest.replace(META_INDEX_FLAG, getMetaAlertIndex()); + PatchRequest patchRequest = JSONUtils.INSTANCE.load(statusPatch, PatchRequest.class); + metaDao.patch(metaDao, patchRequest, Optional.of(System.currentTimeMillis())); + Assert.fail("A patch on the status field should throw an exception"); + + } catch (IllegalArgumentException iae) { + Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths. " + + "Please use the add/remove alert or update status functions instead.", + iae.getMessage()); } + + // ensure the status field was NOT changed + assertEventually(() -> { + Document updated = metaDao.getLatest("meta_alert", METAALERT_TYPE); + Assert.assertEquals(metaAlert.get(STATUS_FIELD), updated.getDocument().get(STATUS_FIELD)); + }); } protected void findUpdatedDoc(Map<String, Object> message0, String guid, String sensorType)