METRON-1421 Create a SolrMetaAlertDao (justinleet) closes apache/metron#970
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/49f851e0 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/49f851e0 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/49f851e0 Branch: refs/heads/master Commit: 49f851e0b8c2ffa1cdd7c8f169bed3dfa07cf35c Parents: eb33666 Author: justinleet <justinjl...@gmail.com> Authored: Wed May 23 10:32:34 2018 -0400 Committer: leet <l...@apache.org> Committed: Wed May 23 10:32:34 2018 -0400 ---------------------------------------------------------------------- metron-analytics/metron-profiler/.gitignore | 1 + .../apache/metron/rest/config/IndexConfig.java | 12 +- .../rest/service/impl/MetaAlertServiceImpl.java | 5 +- .../rest/service/impl/SearchServiceImpl.java | 2 +- .../rest/service/impl/UpdateServiceImpl.java | 2 +- .../MetaAlertControllerIntegrationTest.java | 6 +- .../UpdateControllerIntegrationTest.java | 4 +- .../elasticsearch/dao/ElasticsearchDao.java | 65 +- .../dao/ElasticsearchMetaAlertDao.java | 641 ++--------- ...ElasticsearchMetaAlertRetrieveLatestDao.java | 44 + .../dao/ElasticsearchMetaAlertSearchDao.java | 110 ++ .../dao/ElasticsearchMetaAlertUpdateDao.java | 219 ++++ .../dao/ElasticsearchRetrieveLatestDao.java | 151 +++ .../dao/ElasticsearchSearchDao.java | 102 -- .../dao/ElasticsearchUpdateDao.java | 10 +- .../elasticsearch/utils/ElasticsearchUtils.java | 64 ++ .../elasticsearch/dao/ElasticsearchDaoTest.java | 70 +- .../dao/ElasticsearchMetaAlertDaoTest.java | 164 +-- .../ElasticsearchMetaAlertIntegrationTest.java | 986 ++--------------- .../ElasticsearchSearchIntegrationTest.java | 64 +- .../ElasticsearchUpdateIntegrationTest.java | 84 +- .../components/ElasticSearchComponent.java | 26 +- metron-platform/metron-indexing/README.md | 2 +- metron-platform/metron-indexing/pom.xml | 8 +- .../metron/indexing/dao/AccessConfig.java | 10 + .../apache/metron/indexing/dao/IndexDao.java | 141 +-- .../metron/indexing/dao/MetaAlertDao.java | 154 --- .../metron/indexing/dao/RetrieveLatestDao.java | 67 ++ .../metaalert/DeferredMetaAlertIndexDao.java | 42 + .../metaalert/MetaAlertAddRemoveRequest.java | 1 - .../indexing/dao/metaalert/MetaAlertConfig.java | 74 ++ .../dao/metaalert/MetaAlertConstants.java | 30 + .../indexing/dao/metaalert/MetaAlertDao.java | 77 ++ .../metaalert/MetaAlertRetrieveLatestDao.java | 25 + .../dao/metaalert/MetaAlertSearchDao.java | 35 + .../dao/metaalert/MetaAlertUpdateDao.java | 146 +++ .../indexing/dao/metaalert/MetaScores.java | 52 +- .../AbstractLuceneMetaAlertUpdateDao.java | 334 ++++++ .../metron/indexing/dao/search/SearchDao.java | 22 +- .../indexing/dao/search/SearchResponse.java | 10 +- .../metron/indexing/dao/update/PatchUtil.java | 50 + .../metron/indexing/dao/update/UpdateDao.java | 47 + .../metron/indexing/util/IndexingCacheUtil.java | 35 + .../indexing/dao/InMemoryMetaAlertDao.java | 69 +- .../indexing/dao/SearchIntegrationTest.java | 60 +- .../indexing/dao/UpdateIntegrationTest.java | 87 +- .../dao/metaalert/MetaAlertIntegrationTest.java | 1012 ++++++++++++++++++ .../indexing/dao/metaalert/MetaScoresTest.java | 75 ++ .../AbstractLuceneMetaAlertUpdateDaoTest.java | 854 +++++++++++++++ .../integration/IndexingIntegrationTest.java | 4 +- metron-platform/metron-pcap-backend/.gitignore | 1 + metron-platform/metron-solr/pom.xml | 4 +- .../src/main/config/schema/bro/schema.xml | 3 + .../src/main/config/schema/metaalert/schema.xml | 39 +- .../src/main/config/schema/snort/schema.xml | 3 + .../src/main/config/schema/yaf/schema.xml | 3 + .../org/apache/metron/solr/dao/SolrDao.java | 37 +- .../metron/solr/dao/SolrMetaAlertDao.java | 285 +++-- .../dao/SolrMetaAlertRetrieveLatestDao.java | 77 ++ .../metron/solr/dao/SolrMetaAlertSearchDao.java | 211 ++++ .../metron/solr/dao/SolrMetaAlertUpdateDao.java | 216 ++++ .../metron/solr/dao/SolrRetrieveLatestDao.java | 81 ++ .../apache/metron/solr/dao/SolrSearchDao.java | 127 +-- .../apache/metron/solr/dao/SolrUpdateDao.java | 51 +- .../apache/metron/solr/dao/SolrUtilities.java | 92 ++ .../org/apache/metron/solr/dao/SolrDaoTest.java | 61 +- .../metron/solr/dao/SolrMetaAlertDaoTest.java | 137 +++ .../metron/solr/dao/SolrSearchDaoTest.java | 176 ++- .../metron/solr/dao/SolrUpdateDaoTest.java | 19 +- .../metron/solr/dao/SolrUtilitiesTest.java | 48 + .../SolrIndexingIntegrationTest.java | 5 +- .../SolrMetaAlertIntegrationTest.java | 397 +++++++ .../integration/SolrSearchIntegrationTest.java | 59 +- .../integration/SolrUpdateIntegrationTest.java | 87 +- .../integration/components/SolrComponent.java | 94 +- .../schema/SchemaValidationIntegrationTest.java | 9 +- .../metron/solr/writer/SolrWriterTest.java | 23 +- .../resources/config/test/conf/managed-schema | 84 +- 78 files changed, 6051 insertions(+), 2733 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-analytics/metron-profiler/.gitignore ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/.gitignore b/metron-analytics/metron-profiler/.gitignore new file mode 100644 index 0000000..df1a13b --- /dev/null +++ b/metron-analytics/metron-profiler/.gitignore @@ -0,0 +1 @@ +/logs \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java index 635d1de..c432c6c 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java @@ -20,12 +20,14 @@ package org.apache.metron.rest.config; import static org.apache.metron.rest.MetronRestConstants.INDEX_DAO_IMPL; import java.util.Optional; +import org.apache.metron.common.zookeeper.ConfigurationsCache; import org.apache.metron.hbase.HTableProvider; import org.apache.metron.hbase.TableProvider; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.IndexDaoFactory; -import org.apache.metron.indexing.dao.MetaAlertDao; +import org.apache.metron.indexing.dao.metaalert.MetaAlertDao; +import org.apache.metron.indexing.util.IndexingCacheUtil; import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.GlobalConfigService; @@ -34,10 +36,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; - @Configuration public class IndexConfig { @@ -45,6 +43,9 @@ public class IndexConfig { private GlobalConfigService globalConfigService; @Autowired + private ConfigurationsCache cache; + + @Autowired private Environment environment; @Autowired @@ -72,6 +73,7 @@ public class IndexConfig { throw new IllegalStateException("Unable to retrieve the global config.", e); } }); + config.setIndexSupplier(IndexingCacheUtil.getIndexLookupFunction(cache)); config.setTableProvider(TableProvider.create(hbaseProviderImpl, () -> new HTableProvider())); config.setKerberosEnabled(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)); if (indexDaoImpl == null) { http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java index aafab24..3f9b3e4 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java @@ -19,16 +19,14 @@ package org.apache.metron.rest.service.impl; import java.io.IOException; -import java.util.Collection; import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.MetaAlertDao; +import org.apache.metron.indexing.dao.metaalert.MetaAlertDao; import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; import org.apache.metron.indexing.dao.search.InvalidCreateException; import org.apache.metron.indexing.dao.search.InvalidSearchException; -import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.MetaAlertService; @@ -48,7 +46,6 @@ public class MetaAlertServiceImpl implements MetaAlertService { this.environment = environment; } - @Override public MetaAlertCreateResponse create(MetaAlertCreateRequest createRequest) throws RestException { try { http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java index 21d158f..82b9c11 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java @@ -18,7 +18,7 @@ package org.apache.metron.rest.service.impl; import static org.apache.metron.common.Constants.ERROR_TYPE; -import static org.apache.metron.indexing.dao.MetaAlertDao.METAALERT_TYPE; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_TYPE; import static org.apache.metron.rest.MetronRestConstants.INDEX_WRITER_NAME; import static org.apache.metron.rest.MetronRestConstants.SEARCH_FACET_FIELDS_SPRING_PROPERTY; http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java index 76ac75d..6a42248 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/UpdateServiceImpl.java @@ -44,7 +44,7 @@ public class UpdateServiceImpl implements UpdateService { @Override public void patch(PatchRequest request) throws RestException, OriginalNotFoundException { try { - dao.patch(request, Optional.of(System.currentTimeMillis())); + dao.patch(dao, request, Optional.of(System.currentTimeMillis())); } catch (Exception e) { throw new RestException(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java index 3e69e37..9200fd1 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java @@ -30,13 +30,10 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import org.adrianwalker.multilinestring.Multiline; import org.apache.curator.framework.CuratorFramework; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.InMemoryMetaAlertDao; -import org.apache.metron.indexing.dao.MetaAlertDao; import org.apache.metron.indexing.dao.SearchIntegrationTest; import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; @@ -75,6 +72,7 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest { private String metaalertUrl = "/api/v1/metaalert"; private String user = "user"; private String password = "password"; + private String metaAlertIndex = "metaalert_index"; /** { @@ -111,7 +109,7 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest { ImmutableMap<String, String> testData = ImmutableMap.of( "bro_index_2017.01.01.01", SearchIntegrationTest.broData, "snort_index_2017.01.01.01", SearchIntegrationTest.snortData, - MetaAlertDao.METAALERTS_INDEX, metaAlertData + metaAlertIndex, metaAlertData ); loadTestData(testData); } http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java index e8d00d3..e437325 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.metron.hbase.mock.MockHTable; import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.indexing.dao.HBaseDao; -import org.apache.metron.indexing.dao.MetaAlertDao; import org.apache.metron.indexing.dao.SearchIntegrationTest; import org.apache.metron.rest.service.UpdateService; import org.junit.Assert; @@ -72,6 +71,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest { private String searchUrl = "/api/v1/search"; private String user = "user"; private String password = "password"; + private String metaAlertIndex = "metaalert_index"; /** { @@ -121,7 +121,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest { ImmutableMap<String, String> testData = ImmutableMap.of( "bro_index_2017.01.01.01", SearchIntegrationTest.broData, "snort_index_2017.01.01.01", SearchIntegrationTest.snortData, - MetaAlertDao.METAALERTS_INDEX, MetaAlertControllerIntegrationTest.metaAlertData + metaAlertIndex, MetaAlertControllerIntegrationTest.metaAlertData ); loadTestData(testData); } http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index a09086a..246de6a 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -24,8 +24,8 @@ import java.util.Map; import java.util.Optional; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; -import org.apache.metron.indexing.dao.ColumnMetadataDao; import org.apache.metron.indexing.dao.IndexDao; +import org.apache.metron.indexing.dao.RetrieveLatestDao; import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.GroupRequest; @@ -34,6 +34,9 @@ import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.update.Document; +import org.apache.metron.indexing.dao.update.OriginalNotFoundException; +import org.apache.metron.indexing.dao.update.PatchRequest; +import org.apache.metron.indexing.dao.update.ReplaceRequest; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.index.query.QueryBuilder; import org.slf4j.Logger; @@ -46,6 +49,7 @@ public class ElasticsearchDao implements IndexDao { private transient TransportClient client; private ElasticsearchSearchDao searchDao; private ElasticsearchUpdateDao updateDao; + private ElasticsearchRetrieveLatestDao retrieveLatestDao; /** * Retrieves column metadata about search indices. @@ -63,12 +67,14 @@ public class ElasticsearchDao implements IndexDao { AccessConfig config, ElasticsearchSearchDao searchDao, ElasticsearchUpdateDao updateDao, + ElasticsearchRetrieveLatestDao retrieveLatestDao, ElasticsearchColumnMetadataDao columnMetadataDao, ElasticsearchRequestSubmitter requestSubmitter - ) { + ) { this.client = client; this.searchDao = searchDao; this.updateDao = updateDao; + this.retrieveLatestDao = retrieveLatestDao; this.columnMetadataDao = columnMetadataDao; this.requestSubmitter = requestSubmitter; this.accessConfig = config; @@ -78,32 +84,25 @@ public class ElasticsearchDao implements IndexDao { //uninitialized. } - public ElasticsearchDao columnMetadataDao(ElasticsearchColumnMetadataDao columnMetadataDao) { - this.columnMetadataDao = columnMetadataDao; - return this; - } - - public ElasticsearchDao accessConfig(AccessConfig accessConfig) { - this.accessConfig = accessConfig; - return this; - } - @Override public synchronized void init(AccessConfig config) { - if(this.client == null) { - this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get()); + if (this.client == null) { + this.client = ElasticsearchUtils + .getClient(config.getGlobalConfigSupplier().get()); this.accessConfig = config; this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client.admin()); this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client); - this.searchDao = new ElasticsearchSearchDao(client, accessConfig, columnMetadataDao, requestSubmitter); - this.updateDao = new ElasticsearchUpdateDao(client, accessConfig, searchDao); + this.searchDao = new ElasticsearchSearchDao(client, accessConfig, columnMetadataDao, + requestSubmitter); + this.retrieveLatestDao = new ElasticsearchRetrieveLatestDao(client); + this.updateDao = new ElasticsearchUpdateDao(client, accessConfig, retrieveLatestDao); } - if(columnMetadataDao == null) { + if (columnMetadataDao == null) { throw new IllegalArgumentException("No ColumnMetadataDao available"); } - if(requestSubmitter == null) { + if (requestSubmitter == null) { throw new IllegalArgumentException("No ElasticsearchRequestSubmitter available"); } } @@ -119,14 +118,14 @@ public class ElasticsearchDao implements IndexDao { } @Override - public Document getLatest(final String guid, final String sensorType) throws IOException { - return searchDao.getLatest(guid, sensorType); + public Document getLatest(final String guid, final String sensorType) { + return retrieveLatestDao.getLatest(guid, sensorType); } @Override public Iterable<Document> getAllLatest( - final List<GetRequest> getRequests) throws IOException { - return searchDao.getAllLatest(getRequests); + final List<GetRequest> getRequests) { + return retrieveLatestDao.getAllLatest(getRequests); } @Override @@ -140,19 +139,37 @@ public class ElasticsearchDao implements IndexDao { } @Override + public void patch(RetrieveLatestDao retrieveLatestDao, PatchRequest request, Optional<Long> timestamp) + throws OriginalNotFoundException, IOException { + updateDao.patch(retrieveLatestDao, request, timestamp); + } + + @Override + public void replace(ReplaceRequest request, Optional<Long> timestamp) throws IOException { + updateDao.replace(request, timestamp); + } + + @Override public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException { return this.columnMetadataDao.getColumnMetadata(indices); } + @Override + public Optional<Map<String, Object>> getLatestResult(GetRequest request) throws IOException { + return retrieveLatestDao.getLatestResult(request); + } + protected Optional<String> getIndexName(String guid, String sensorType) { return updateDao.getIndexName(guid, sensorType); } - protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) throws InvalidSearchException { + protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) + throws InvalidSearchException { return searchDao.search(request, queryBuilder); } - protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder) throws InvalidSearchException { + protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder) + throws InvalidSearchException { return searchDao.group(groupRequest, queryBuilder); } http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java index 2311a2b..faec939 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java @@ -18,29 +18,21 @@ package org.apache.metron.elasticsearch.dao; -import static org.apache.metron.common.Constants.GUID; -import static org.elasticsearch.index.query.QueryBuilders.boolQuery; -import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery; -import static org.elasticsearch.index.query.QueryBuilders.existsQuery; -import static org.elasticsearch.index.query.QueryBuilders.nestedQuery; -import static org.elasticsearch.index.query.QueryBuilders.termQuery; - -import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; -import java.util.*; -import java.util.Map.Entry; -import java.util.stream.Collectors; -import org.apache.commons.collections4.SetUtils; -import org.apache.lucene.search.join.ScoreMode; +import java.util.List; +import java.util.Map; +import java.util.Optional; import org.apache.metron.common.Constants; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.MetaAlertDao; import org.apache.metron.indexing.dao.MultiIndexDao; +import org.apache.metron.indexing.dao.RetrieveLatestDao; +import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig; +import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; +import org.apache.metron.indexing.dao.metaalert.MetaAlertDao; import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; -import org.apache.metron.indexing.dao.metaalert.MetaScores; import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.GroupRequest; @@ -49,59 +41,36 @@ import org.apache.metron.indexing.dao.search.InvalidCreateException; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; -import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.indexing.dao.update.Document; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.get.MultiGetItemResponse; -import org.elasticsearch.action.get.MultiGetRequest.Item; -import org.elasticsearch.action.get.MultiGetRequestBuilder; -import org.elasticsearch.action.get.MultiGetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.query.InnerHitBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.query.QueryStringQueryBuilder; -import org.elasticsearch.search.SearchHit; import org.apache.metron.indexing.dao.update.OriginalNotFoundException; import org.apache.metron.indexing.dao.update.PatchRequest; -import org.apache.metron.stellar.common.utils.ConversionUtils; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.query.QueryStringQueryBuilder; public class ElasticsearchMetaAlertDao implements MetaAlertDao { - public static final String SOURCE_TYPE = Constants.SENSOR_TYPE.replace('.', ':'); - private static final String STATUS_PATH = "/status"; - private static final String ALERT_PATH = "/alert"; + public static final String THREAT_TRIAGE_FIELD = MetaAlertConstants.THREAT_FIELD_DEFAULT + .replace('.', ':'); + public static final String METAALERTS_INDEX = "metaalert_index"; + + public static final String SOURCE_TYPE_FIELD = Constants.SENSOR_TYPE.replace('.', ':'); + protected String metaAlertsIndex = METAALERTS_INDEX; + protected String threatTriageField = THREAT_TRIAGE_FIELD; + protected String threatSort = MetaAlertConstants.THREAT_SORT_DEFAULT; - private IndexDao indexDao; private ElasticsearchDao elasticsearchDao; - private String index = METAALERTS_INDEX; - private String threatTriageField = THREAT_FIELD_DEFAULT; + private IndexDao indexDao; + private ElasticsearchMetaAlertSearchDao metaAlertSearchDao; + private ElasticsearchMetaAlertUpdateDao metaAlertUpdateDao; + private ElasticsearchMetaAlertRetrieveLatestDao metaAlertRetrieveLatestDao; - /** - * Defines which summary aggregation is used to represent the overall threat triage score for - * the metaalert. The summary aggregation is applied to the threat triage score of all child alerts. - * - * This overall score is primarily used for sorting; hence it is called the 'threatSort'. This - * can be either max, min, average, count, median, or sum. - */ - private String threatSort = THREAT_SORT_DEFAULT; - private int pageSize = 500; + protected int pageSize = 500; /** * Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts. * @param indexDao The Dao to wrap */ public ElasticsearchMetaAlertDao(IndexDao indexDao) { - this(indexDao, METAALERTS_INDEX, THREAT_FIELD_DEFAULT, THREAT_SORT_DEFAULT); + this(indexDao, METAALERTS_INDEX, MetaAlertConstants.THREAT_FIELD_DEFAULT, + MetaAlertConstants.THREAT_SORT_DEFAULT); } /** @@ -112,10 +81,13 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { * as the overall threat triage score for the metaalert. This * can be either max, min, average, count, median, or sum. */ - public ElasticsearchMetaAlertDao(IndexDao indexDao, String index, String triageLevelField, String threatSort) { + public ElasticsearchMetaAlertDao(IndexDao indexDao, String metaAlertsIndex, + String triageLevelField, + String threatSort) { init(indexDao, Optional.of(threatSort)); - this.index = index; this.threatTriageField = triageLevelField; + this.threatSort = threatSort; + this.metaAlertsIndex = metaAlertsIndex; } public ElasticsearchMetaAlertDao() { @@ -123,8 +95,10 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { } /** - * Initializes this implementation by setting the supplied IndexDao and also setting a separate ElasticsearchDao. - * This is needed for some specific Elasticsearch functions (looking up an index from a GUID for example). + * Initializes this implementation by setting the supplied IndexDao and also setting a separate + * ElasticsearchDao. + * This is needed for some specific Elasticsearch functions (looking up an index from a GUID for + * example). * @param indexDao The DAO to wrap for our queries * @param threatSort The summary aggregation of the child threat triage scores used * as the overall threat triage score for the metaalert. This @@ -152,6 +126,24 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { if (threatSort.isPresent()) { this.threatSort = threatSort.get(); } + + MetaAlertConfig config = new MetaAlertConfig( + metaAlertsIndex, + threatTriageField, + this.threatSort, + ElasticsearchMetaAlertDao.SOURCE_TYPE_FIELD + ); + + this.metaAlertSearchDao = new ElasticsearchMetaAlertSearchDao( + elasticsearchDao, + config, + pageSize); + this.metaAlertRetrieveLatestDao = new ElasticsearchMetaAlertRetrieveLatestDao(indexDao); + this.metaAlertUpdateDao = new ElasticsearchMetaAlertUpdateDao( + elasticsearchDao, + metaAlertRetrieveLatestDao, + config, + pageSize); } @Override @@ -160,551 +152,74 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { } @Override - public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException { - if (guid == null || guid.trim().isEmpty()) { - throw new InvalidSearchException("Guid cannot be empty"); - } - // Searches for all alerts containing the meta alert guid in it's "metalerts" array - QueryBuilder qb = boolQuery() - .must( - nestedQuery( - ALERT_FIELD, - boolQuery() - .must(termQuery(ALERT_FIELD + "." + GUID, guid)), - ScoreMode.None - ).innerHit(new InnerHitBuilder()) - ) - .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString())); - return queryAllResults(qb); + public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException { + return indexDao.getColumnMetadata(indices); } @Override - @SuppressWarnings("unchecked") - public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request) - throws InvalidCreateException, IOException { - List<GetRequest> alertRequests = request.getAlerts(); - if (request.getAlerts().isEmpty()) { - throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts"); - } - if (request.getGroups().isEmpty()) { - throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups"); - } - - // Retrieve the documents going into the meta alert and build it - Iterable<Document> alerts = indexDao.getAllLatest(alertRequests); - - Document metaAlert = buildCreateDocument(alerts, request.getGroups()); - calculateMetaScores(metaAlert); - // Add source type to be consistent with other sources and allow filtering - metaAlert.getDocument().put(SOURCE_TYPE, MetaAlertDao.METAALERT_TYPE); - - // Start a list of updates / inserts we need to run - Map<Document, Optional<String>> updates = new HashMap<>(); - updates.put(metaAlert, Optional.of(MetaAlertDao.METAALERTS_INDEX)); - - try { - // We need to update the associated alerts with the new meta alerts, making sure existing - // links are maintained. - Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap( - GetRequest::getGuid, GetRequest::getIndex)); - Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap( - GetRequest::getGuid, GetRequest::getSensorType)); - for (Document alert: alerts) { - if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) { - // Use the index in the request if it exists - Optional<String> index = guidToIndices.get(alert.getGuid()); - if (!index.isPresent()) { - // Look up the index from Elasticsearch if one is not supplied in the request - index = elasticsearchDao.getIndexName(alert.getGuid(), guidToSensorTypes.get(alert.getGuid())); - if (!index.isPresent()) { - throw new IllegalArgumentException("Could not find index for " + alert.getGuid()); - } - } - updates.put(alert, index); - } - } - - // Kick off any updates. - indexDaoUpdate(updates); - - MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse(); - createResponse.setCreated(true); - createResponse.setGuid(metaAlert.getGuid()); - return createResponse; - } catch (IOException ioe) { - throw new InvalidCreateException("Unable to create meta alert", ioe); - } + public Document getLatest(String guid, String sensorType) throws IOException { + return indexDao.getLatest(guid, sensorType); } @Override - public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests) - throws IOException { - Map<Document, Optional<String>> updates = new HashMap<>(); - Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE); - if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) { - Iterable<Document> alerts = indexDao.getAllLatest(alertRequests); - boolean metaAlertUpdated = addAlertsToMetaAlert(metaAlert, alerts); - if (metaAlertUpdated) { - calculateMetaScores(metaAlert); - updates.put(metaAlert, Optional.of(index)); - for(Document alert: alerts) { - if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) { - updates.put(alert, Optional.empty()); - } - } - indexDaoUpdate(updates); - } - return metaAlertUpdated; - } else { - throw new IllegalStateException("Adding alerts to an INACTIVE meta alert is not allowed"); - } + public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { + return indexDao.getAllLatest(getRequests); } - protected boolean addAlertsToMetaAlert(Document metaAlert, Iterable<Document> alerts) { - boolean alertAdded = false; - List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD); - Set<String> currentAlertGuids = currentAlerts.stream().map(currentAlert -> - (String) currentAlert.get(GUID)).collect(Collectors.toSet()); - for (Document alert: alerts) { - String alertGuid = alert.getGuid(); - // Only add an alert if it isn't already in the meta alert - if (!currentAlertGuids.contains(alertGuid)) { - currentAlerts.add(alert.getDocument()); - alertAdded = true; - } - } - return alertAdded; + @Override + public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException { + return metaAlertSearchDao.getAllMetaAlertsForAlert(guid); } - protected boolean addMetaAlertToAlert(String metaAlertGuid, Document alert) { - List<String> metaAlertField = new ArrayList<>(); - List<String> alertField = (List<String>) alert.getDocument() - .get(MetaAlertDao.METAALERT_FIELD); - if (alertField != null) { - metaAlertField.addAll(alertField); - } - boolean metaAlertAdded = !metaAlertField.contains(metaAlertGuid); - if (metaAlertAdded) { - metaAlertField.add(metaAlertGuid); - alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField); - } - return metaAlertAdded; + @Override + public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request) + throws InvalidCreateException, IOException { + return metaAlertUpdateDao.createMetaAlert(request); } @Override - public boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests) + public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests) throws IOException { - Map<Document, Optional<String>> updates = new HashMap<>(); - Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE); - if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) { - Iterable<Document> alerts = indexDao.getAllLatest(alertRequests); - Collection<String> alertGuids = alertRequests.stream().map(GetRequest::getGuid).collect( - Collectors.toList()); - boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, alertGuids); - if (metaAlertUpdated) { - calculateMetaScores(metaAlert); - updates.put(metaAlert, Optional.of(index)); - for(Document alert: alerts) { - if (removeMetaAlertFromAlert(metaAlert.getGuid(), alert)) { - updates.put(alert, Optional.empty()); - } - } - indexDaoUpdate(updates); - } - return metaAlertUpdated; - } else { - throw new IllegalStateException("Removing alerts from an INACTIVE meta alert is not allowed"); - } - + return metaAlertUpdateDao.addAlertsToMetaAlert(metaAlertGuid, alertRequests); } - protected boolean removeAlertsFromMetaAlert(Document metaAlert, Collection<String> alertGuids) { - List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD); - int previousSize = currentAlerts.size(); - // Only remove an alert if it is in the meta alert - currentAlerts.removeIf(currentAlert -> alertGuids.contains((String) currentAlert.get(GUID))); - return currentAlerts.size() != previousSize; - } - - protected boolean removeMetaAlertFromAlert(String metaAlertGuid, Document alert) { - List<String> metaAlertField = new ArrayList<>(); - List<String> alertField = (List<String>) alert.getDocument() - .get(MetaAlertDao.METAALERT_FIELD); - if (alertField != null) { - metaAlertField.addAll(alertField); - } - boolean metaAlertRemoved = metaAlertField.remove(metaAlertGuid); - if (metaAlertRemoved) { - alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField); - } - return metaAlertRemoved; + @Override + public boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests) + throws IOException { + return metaAlertUpdateDao.removeAlertsFromMetaAlert(metaAlertGuid, alertRequests); } @Override public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status) throws IOException { - Map<Document, Optional<String>> updates = new HashMap<>(); - Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE); - String currentStatus = (String) metaAlert.getDocument().get(MetaAlertDao.STATUS_FIELD); - boolean metaAlertUpdated = !status.getStatusString().equals(currentStatus); - if (metaAlertUpdated) { - metaAlert.getDocument().put(MetaAlertDao.STATUS_FIELD, status.getStatusString()); - updates.put(metaAlert, Optional.of(index)); - List<GetRequest> getRequests = new ArrayList<>(); - List<Map<String, Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument() - .get(MetaAlertDao.ALERT_FIELD); - currentAlerts.stream().forEach(currentAlert -> { - getRequests.add(new GetRequest((String) currentAlert.get(GUID), (String) currentAlert.get(SOURCE_TYPE))); - }); - Iterable<Document> alerts = indexDao.getAllLatest(getRequests); - for (Document alert : alerts) { - boolean metaAlertAdded = false; - boolean metaAlertRemoved = false; - // If we're making it active add add the meta alert guid for every alert. - if (MetaAlertStatus.ACTIVE.equals(status)) { - metaAlertAdded = addMetaAlertToAlert(metaAlert.getGuid(), alert); - } - // If we're making it inactive, remove the meta alert guid from every alert. - if (MetaAlertStatus.INACTIVE.equals(status)) { - metaAlertRemoved = removeMetaAlertFromAlert(metaAlert.getGuid(), alert); - } - if (metaAlertAdded || metaAlertRemoved) { - updates.put(alert, Optional.empty()); - } - } - } - if (metaAlertUpdated) { - indexDaoUpdate(updates); - } - return metaAlertUpdated; + return metaAlertUpdateDao.updateMetaAlertStatus(metaAlertGuid, status); } @Override public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { - // Wrap the query to also get any meta-alerts. - QueryBuilder qb = constantScoreQuery(boolQuery() - .must(boolQuery() - .should(new QueryStringQueryBuilder(searchRequest.getQuery())) - .should(nestedQuery( - ALERT_FIELD, - new QueryStringQueryBuilder(searchRequest.getQuery()), - ScoreMode.None - ) - ) - ) - // Ensures that it's a meta alert with active status or that it's an alert (signified by - // having no status field) - .must(boolQuery() - .should(termQuery(MetaAlertDao.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString())) - .should(boolQuery().mustNot(existsQuery(MetaAlertDao.STATUS_FIELD))) - ) - .mustNot(existsQuery(MetaAlertDao.METAALERT_FIELD)) - ); - return elasticsearchDao.search(searchRequest, qb); + return metaAlertSearchDao.search(searchRequest); } @Override - public Document getLatest(String guid, String sensorType) throws IOException { - return indexDao.getLatest(guid, sensorType); - } - - @Override - public Iterable<Document> getAllLatest( - List<GetRequest> getRequests) throws IOException { - return indexDao.getAllLatest(getRequests); + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + return metaAlertSearchDao.group(groupRequest); } @Override public void update(Document update, Optional<String> index) throws IOException { - if (METAALERT_TYPE.equals(update.getSensorType())) { - // We've been passed an update to the meta alert. - throw new UnsupportedOperationException("Meta alerts cannot be directly updated"); - } else { - Map<Document, Optional<String>> updates = new HashMap<>(); - updates.put(update, index); - // We need to update an alert itself. Only that portion of the update can be delegated. - // We still need to get meta alerts potentially associated with it and update. - Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults().stream() - .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), METAALERT_TYPE, 0L)) - .collect(Collectors.toList()); - // Each meta alert needs to be updated with the new alert - for (Document metaAlert : metaAlerts) { - replaceAlertInMetaAlert(metaAlert, update); - updates.put(metaAlert, Optional.of(METAALERTS_INDEX)); - } - - // Run the alert's update - indexDao.batchUpdate(updates); - } - } - - protected boolean replaceAlertInMetaAlert(Document metaAlert, Document alert) { - boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, Collections.singleton(alert.getGuid())); - if (metaAlertUpdated) { - addAlertsToMetaAlert(metaAlert, Collections.singleton(alert)); - } - return metaAlertUpdated; + metaAlertUpdateDao.update(update, index); } @Override - public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException { - throw new UnsupportedOperationException("Meta alerts do not allow for bulk updates"); + public void batchUpdate(Map<Document, Optional<String>> updates) { + metaAlertUpdateDao.batchUpdate(updates); } - /** - * Does not allow patches on the "alerts" or "status" fields. These fields must be updated with their - * dedicated methods. - * - * @param request The patch request - * @param timestamp Optionally a timestamp to set. If not specified then current time is used. - * @throws OriginalNotFoundException - * @throws IOException - */ @Override - public void patch(PatchRequest request, Optional<Long> timestamp) + public void patch(RetrieveLatestDao retrieveLatestDao, PatchRequest request, + Optional<Long> timestamp) throws OriginalNotFoundException, IOException { - if (isPatchAllowed(request)) { - Document d = getPatchedDocument(request, timestamp); - indexDao.update(d, Optional.ofNullable(request.getIndex())); - } else { - throw new IllegalArgumentException("Meta alert patches are not allowed for /alert or /status paths. " - + "Please use the add/remove alert or update status functions instead."); - } - } - - protected boolean isPatchAllowed(PatchRequest request) { - if(request.getPatch() != null && !request.getPatch().isEmpty()) { - for(Map<String, Object> patch : request.getPatch()) { - Object pathObj = patch.get("path"); - if(pathObj != null && pathObj instanceof String) { - String path = (String)pathObj; - if (STATUS_PATH.equals(path) || ALERT_PATH.equals(path)) { - return false; - } - } - } - } - return true; - } - - /** - * Given an alert GUID, retrieve all associated meta alerts. - * @param alertGuid The GUID of the child alert - * @return The Elasticsearch response containing the meta alerts - */ - protected SearchResponse getMetaAlertsForAlert(String alertGuid) { - QueryBuilder qb = boolQuery() - .must( - nestedQuery( - ALERT_FIELD, - boolQuery() - .must(termQuery(ALERT_FIELD + "." + Constants.GUID, alertGuid)), - ScoreMode.None - ).innerHit(new InnerHitBuilder()) - ) - .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString())); - return queryAllResults(qb); - } - - /** - * Elasticsearch queries default to 10 records returned. Some internal queries require that all - * results are returned. Rather than setting an arbitrarily high size, this method pages through results - * and returns them all in a single SearchResponse. - * @param qb - * @return - */ - protected SearchResponse queryAllResults(QueryBuilder qb) { - SearchRequestBuilder searchRequestBuilder = elasticsearchDao - .getClient() - .prepareSearch(index) - .addStoredField("*") - .setFetchSource(true) - .setQuery(qb) - .setSize(pageSize); - org.elasticsearch.action.search.SearchResponse esResponse = searchRequestBuilder - .execute() - .actionGet(); - List<SearchResult> allResults = getSearchResults(esResponse); - long total = esResponse.getHits().getTotalHits(); - if (total > pageSize) { - int pages = (int) (total / pageSize) + 1; - for (int i = 1; i < pages; i++) { - int from = i * pageSize; - searchRequestBuilder.setFrom(from); - esResponse = searchRequestBuilder - .execute() - .actionGet(); - allResults.addAll(getSearchResults(esResponse)); - } - } - SearchResponse searchResponse = new SearchResponse(); - searchResponse.setTotal(total); - searchResponse.setResults(allResults); - return searchResponse; - } - - /** - * Transforms a list of Elasticsearch SearchHits to a list of SearchResults - * @param searchResponse - * @return - */ - protected List<SearchResult> getSearchResults(org.elasticsearch.action.search.SearchResponse searchResponse) { - return Arrays.stream(searchResponse.getHits().getHits()).map(searchHit -> { - SearchResult searchResult = new SearchResult(); - searchResult.setId(searchHit.getId()); - searchResult.setSource(searchHit.getSource()); - searchResult.setScore(searchHit.getScore()); - searchResult.setIndex(searchHit.getIndex()); - return searchResult; - } - ).collect(Collectors.toList()); - } - - /** - * Build the Document representing a meta alert to be created. - * @param alerts The Elasticsearch results for the meta alerts child documents - * @param groups The groups used to create this meta alert - * @return A Document representing the new meta alert - */ - protected Document buildCreateDocument(Iterable<Document> alerts, List<String> groups) { - // Need to create a Document from the multiget. Scores will be calculated later - Map<String, Object> metaSource = new HashMap<>(); - List<Map<String, Object>> alertList = new ArrayList<>(); - for (Document alert: alerts) { - alertList.add(alert.getDocument()); - } - metaSource.put(ALERT_FIELD, alertList); - - // Add any meta fields - String guid = UUID.randomUUID().toString(); - metaSource.put(GUID, guid); - metaSource.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis()); - metaSource.put(GROUPS_FIELD, groups); - metaSource.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()); - - return new Document(metaSource, guid, METAALERT_TYPE, System.currentTimeMillis()); - } - - /** - * Calls the single update variant if there's only one update, otherwise calls batch. - * @param updates The list of updates to run - * @throws IOException If there's an update error - */ - protected void indexDaoUpdate(Map<Document, Optional<String>> updates) throws IOException { - if (updates.size() == 1) { - Entry<Document, Optional<String>> singleUpdate = updates.entrySet().iterator().next(); - indexDao.update(singleUpdate.getKey(), singleUpdate.getValue()); - } else if (updates.size() > 1) { - indexDao.batchUpdate(updates); - } // else we have no updates, so don't do anything - } - - - - @SuppressWarnings("unchecked") - protected List<Map<String, Object>> getAllAlertsForMetaAlert(Document update) throws IOException { - Document latest = indexDao.getLatest(update.getGuid(), MetaAlertDao.METAALERT_TYPE); - if (latest == null) { - return new ArrayList<>(); - } - List<String> guids = new ArrayList<>(); - List<Map<String, Object>> latestAlerts = (List<Map<String, Object>>) latest.getDocument() - .get(MetaAlertDao.ALERT_FIELD); - for (Map<String, Object> alert : latestAlerts) { - guids.add((String) alert.get(Constants.GUID)); - } - - List<Map<String, Object>> alerts = new ArrayList<>(); - QueryBuilder query = QueryBuilders.idsQuery().addIds(guids.toArray(new String[0])); - SearchRequestBuilder request = elasticsearchDao.getClient().prepareSearch() - .setQuery(query); - org.elasticsearch.action.search.SearchResponse response = request.get(); - for (SearchHit hit : response.getHits().getHits()) { - alerts.add(hit.sourceAsMap()); - } - return alerts; - } - - /** - * Builds an update Document for updating the meta alerts list. - * @param alertGuid The GUID of the alert to update - * @param sensorType The sensor type to update - * @param metaAlertField The new metaAlertList to use - * @return The update Document - */ - protected Document buildAlertUpdate(String alertGuid, String sensorType, - List<String> metaAlertField, Long timestamp) { - Document alertUpdate; - Map<String, Object> document = new HashMap<>(); - document.put(MetaAlertDao.METAALERT_FIELD, metaAlertField); - alertUpdate = new Document( - document, - alertGuid, - sensorType, - timestamp - ); - return alertUpdate; - } - - - @Override - public Map<String, FieldType> getColumnMetadata(List<String> indices) - throws IOException { - return indexDao.getColumnMetadata(indices); - } - - @Override - public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { - // Wrap the query to hide any alerts already contained in meta alerts - QueryBuilder qb = QueryBuilders.boolQuery() - .must(new QueryStringQueryBuilder(groupRequest.getQuery())) - .mustNot(existsQuery(MetaAlertDao.METAALERT_FIELD)); - return elasticsearchDao.group(groupRequest, qb); - } - - /** - * Calculate the meta alert scores for a Document. - * @param metaAlert The Document containing scores - * @return Set of score statistics - */ - @SuppressWarnings("unchecked") - protected void calculateMetaScores(Document metaAlert) { - MetaScores metaScores = new MetaScores(new ArrayList<>()); - List<Object> alertsRaw = ((List<Object>) metaAlert.getDocument().get(ALERT_FIELD)); - if (alertsRaw != null && !alertsRaw.isEmpty()) { - ArrayList<Double> scores = new ArrayList<>(); - for (Object alertRaw : alertsRaw) { - Map<String, Object> alert = (Map<String, Object>) alertRaw; - Double scoreNum = parseThreatField(alert.get(threatTriageField)); - if (scoreNum != null) { - scores.add(scoreNum); - } - } - metaScores = new MetaScores(scores); - } - - // add a summary (max, min, avg, ...) of all the threat scores from the child alerts - metaAlert.getDocument().putAll(metaScores.getMetaScores()); - - // add the overall threat score for the metaalert; one of the summary aggregations as defined by `threatSort` - Object threatScore = metaScores.getMetaScores().get(threatSort); - - // add the threat score as a float; type needs to match the threat score field from each of the sensor indices - metaAlert.getDocument().put(threatTriageField, ConversionUtils.convert(threatScore, Float.class)); - } - - private Double parseThreatField(Object threatRaw) { - Double threat = null; - if (threatRaw instanceof Number) { - threat = ((Number) threatRaw).doubleValue(); - } else if (threatRaw instanceof String) { - threat = Double.parseDouble((String) threatRaw); - } - return threat; - } - - public int getPageSize() { - return pageSize; + metaAlertUpdateDao.patch(retrieveLatestDao, request, timestamp); } public void setPageSize(int pageSize) { http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertRetrieveLatestDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertRetrieveLatestDao.java new file mode 100644 index 0000000..8aa55d6 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertRetrieveLatestDao.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.dao; + +import java.io.IOException; +import java.util.List; +import org.apache.metron.indexing.dao.RetrieveLatestDao; +import org.apache.metron.indexing.dao.metaalert.MetaAlertRetrieveLatestDao; +import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.update.Document; + +public class ElasticsearchMetaAlertRetrieveLatestDao implements MetaAlertRetrieveLatestDao { + private RetrieveLatestDao retrieveLatestDao; + + public ElasticsearchMetaAlertRetrieveLatestDao(RetrieveLatestDao retrieveLatestDao) { + this.retrieveLatestDao = retrieveLatestDao; + } + + @Override + public Document getLatest(String guid, String sensorType) throws IOException { + return retrieveLatestDao.getLatest(guid, sensorType); + } + + @Override + public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { + return retrieveLatestDao.getAllLatest(getRequests); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java new file mode 100644 index 0000000..00fc9d0 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.dao; + +import static org.apache.metron.common.Constants.GUID; +import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.queryAllResults; +import static org.elasticsearch.index.query.QueryBuilders.boolQuery; +import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery; +import static org.elasticsearch.index.query.QueryBuilders.existsQuery; +import static org.elasticsearch.index.query.QueryBuilders.nestedQuery; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; + +import org.apache.lucene.search.join.ScoreMode; +import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig; +import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants; +import org.apache.metron.indexing.dao.metaalert.MetaAlertSearchDao; +import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.elasticsearch.index.query.InnerHitBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.QueryStringQueryBuilder; + +public class ElasticsearchMetaAlertSearchDao implements MetaAlertSearchDao { + + protected ElasticsearchDao elasticsearchDao; + private MetaAlertConfig config; + private int pageSize; + + public ElasticsearchMetaAlertSearchDao(ElasticsearchDao elasticsearchDao, + MetaAlertConfig config, int pageSize) { + this.elasticsearchDao = elasticsearchDao; + this.config = config; + this.pageSize = pageSize; + } + + @Override + public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { + // Wrap the query to also get any meta-alerts. + QueryBuilder qb = constantScoreQuery(boolQuery() + .must(boolQuery() + .should(new QueryStringQueryBuilder(searchRequest.getQuery())) + .should(nestedQuery( + MetaAlertConstants.ALERT_FIELD, + new QueryStringQueryBuilder(searchRequest.getQuery()), + ScoreMode.None + ) + ) + ) + // Ensures that it's a meta alert with active status or that it's an alert (signified by + // having no status field) + .must(boolQuery() + .should(termQuery(MetaAlertConstants.STATUS_FIELD, + MetaAlertStatus.ACTIVE.getStatusString())) + .should(boolQuery().mustNot(existsQuery(MetaAlertConstants.STATUS_FIELD))) + ) + .mustNot(existsQuery(MetaAlertConstants.METAALERT_FIELD)) + ); + return elasticsearchDao.search(searchRequest, qb); + } + + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + // Wrap the query to hide any alerts already contained in meta alerts + QueryBuilder qb = QueryBuilders.boolQuery() + .must(new QueryStringQueryBuilder(groupRequest.getQuery())) + .mustNot(existsQuery(MetaAlertConstants.METAALERT_FIELD)); + return elasticsearchDao.group(groupRequest, qb); + } + + @Override + public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException { + if (guid == null || guid.trim().isEmpty()) { + throw new InvalidSearchException("Guid cannot be empty"); + } + // Searches for all alerts containing the meta alert guid in it's "metalerts" array + QueryBuilder qb = boolQuery() + .must( + nestedQuery( + MetaAlertConstants.ALERT_FIELD, + boolQuery() + .must(termQuery(MetaAlertConstants.ALERT_FIELD + "." + GUID, guid)), + ScoreMode.None + ).innerHit(new InnerHitBuilder()) + ) + .must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString())); + return queryAllResults(elasticsearchDao.getClient(), qb, config.getMetaAlertIndex(), + pageSize); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java new file mode 100644 index 0000000..6c709a6 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.dao; + +import static org.elasticsearch.index.query.QueryBuilders.boolQuery; +import static org.elasticsearch.index.query.QueryBuilders.nestedQuery; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.lucene.search.join.ScoreMode; +import org.apache.metron.common.Constants; +import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; +import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig; +import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants; +import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; +import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; +import org.apache.metron.indexing.dao.metaalert.MetaAlertRetrieveLatestDao; +import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; +import org.apache.metron.indexing.dao.metaalert.MetaScores; +import org.apache.metron.indexing.dao.metaalert.lucene.AbstractLuceneMetaAlertUpdateDao; +import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.search.InvalidCreateException; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.index.query.InnerHitBuilder; +import org.elasticsearch.index.query.QueryBuilder; + +public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpdateDao { + + private ElasticsearchDao elasticsearchDao; + private MetaAlertRetrieveLatestDao retrieveLatestDao; + private int pageSize; + + /** + * Constructor an ElasticsearchMetaAlertUpdateDao + * @param elasticsearchDao An UpdateDao to defer queries to. + * @param retrieveLatestDao A RetrieveLatestDao for getting the current state of items being + * mutated. + * @param config The meta alert config to use. + */ + public ElasticsearchMetaAlertUpdateDao( + ElasticsearchDao elasticsearchDao, + MetaAlertRetrieveLatestDao retrieveLatestDao, + MetaAlertConfig config, + int pageSize + ) { + super(elasticsearchDao, retrieveLatestDao, config); + this.elasticsearchDao = elasticsearchDao; + this.retrieveLatestDao = retrieveLatestDao; + this.pageSize = pageSize; + } + + @Override + @SuppressWarnings("unchecked") + public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request) + throws InvalidCreateException, IOException { + List<GetRequest> alertRequests = request.getAlerts(); + if (request.getAlerts().isEmpty()) { + throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts"); + } + if (request.getGroups().isEmpty()) { + throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups"); + } + + // Retrieve the documents going into the meta alert and build it + Iterable<Document> alerts = retrieveLatestDao.getAllLatest(alertRequests); + + Document metaAlert = buildCreateDocument(alerts, request.getGroups(), + MetaAlertConstants.ALERT_FIELD); + MetaScores + .calculateMetaScores(metaAlert, getConfig().getThreatTriageField(), + getConfig().getThreatSort()); + // Add source type to be consistent with other sources and allow filtering + metaAlert.getDocument() + .put(ElasticsearchMetaAlertDao.SOURCE_TYPE_FIELD, MetaAlertConstants.METAALERT_TYPE); + + // Start a list of updates / inserts we need to run + Map<Document, Optional<String>> updates = new HashMap<>(); + updates.put(metaAlert, Optional.of(getConfig().getMetaAlertIndex())); + + try { + // We need to update the associated alerts with the new meta alerts, making sure existing + // links are maintained. + Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap( + GetRequest::getGuid, GetRequest::getIndex)); + Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap( + GetRequest::getGuid, GetRequest::getSensorType)); + for (Document alert : alerts) { + if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) { + // Use the index in the request if it exists + Optional<String> index = guidToIndices.get(alert.getGuid()); + if (!index.isPresent()) { + // Look up the index from Elasticsearch if one is not supplied in the request + index = elasticsearchDao + .getIndexName(alert.getGuid(), guidToSensorTypes.get(alert.getGuid())); + if (!index.isPresent()) { + throw new IllegalArgumentException("Could not find index for " + alert.getGuid()); + } + } + updates.put(alert, index); + } + } + + // Kick off any updates. + update(updates); + + MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse(); + createResponse.setCreated(true); + createResponse.setGuid(metaAlert.getGuid()); + return createResponse; + } catch (IOException ioe) { + throw new InvalidCreateException("Unable to create meta alert", ioe); + } + } + + /** + * Adds alerts to a metaalert, based on a list of GetRequests provided for retrieval. + * @param metaAlertGuid The GUID of the metaalert to be given new children. + * @param alertRequests GetRequests for the appropriate alerts to add. + * @return True if metaalert is modified, false otherwise. + */ + public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests) + throws IOException { + + Document metaAlert = retrieveLatestDao + .getLatest(metaAlertGuid, MetaAlertConstants.METAALERT_TYPE); + if (MetaAlertStatus.ACTIVE.getStatusString() + .equals(metaAlert.getDocument().get(MetaAlertConstants.STATUS_FIELD))) { + Iterable<Document> alerts = retrieveLatestDao.getAllLatest(alertRequests); + Map<Document, Optional<String>> updates = buildAddAlertToMetaAlertUpdates(metaAlert, alerts); + update(updates); + return updates.size() != 0; + } else { + throw new IllegalStateException("Adding alerts to an INACTIVE meta alert is not allowed"); + } + } + + @Override + public void update(Document update, Optional<String> index) throws IOException { + if (MetaAlertConstants.METAALERT_TYPE.equals(update.getSensorType())) { + // We've been passed an update to the meta alert. + throw new UnsupportedOperationException("Meta alerts cannot be directly updated"); + } else { + Map<Document, Optional<String>> updates = new HashMap<>(); + updates.put(update, index); + // We need to update an alert itself. Only that portion of the update can be delegated. + // We still need to get meta alerts potentially associated with it and update. + Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults() + .stream() + .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), + MetaAlertConstants.METAALERT_TYPE, 0L)) + .collect(Collectors.toList()); + // Each meta alert needs to be updated with the new alert + for (Document metaAlert : metaAlerts) { + if (replaceAlertInMetaAlert(metaAlert, update)) { + updates.put(metaAlert, Optional.of(getConfig().getMetaAlertIndex())); + } + } + + // Run the alert's update + elasticsearchDao.batchUpdate(updates); + } + } + + /** + * Given an alert GUID, retrieve all associated meta alerts. + * @param alertGuid The GUID of the child alert + * @return The Elasticsearch response containing the meta alerts + */ + protected SearchResponse getMetaAlertsForAlert(String alertGuid) { + QueryBuilder qb = boolQuery() + .must( + nestedQuery( + MetaAlertConstants.ALERT_FIELD, + boolQuery() + .must(termQuery(MetaAlertConstants.ALERT_FIELD + "." + Constants.GUID, + alertGuid)), + ScoreMode.None + ).innerHit(new InnerHitBuilder()) + ) + .must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString())); + return ElasticsearchUtils + .queryAllResults(elasticsearchDao.getClient(), qb, getConfig().getMetaAlertIndex(), + pageSize); + } + + + protected boolean replaceAlertInMetaAlert(Document metaAlert, Document alert) { + boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, + Collections.singleton(alert.getGuid())); + if (metaAlertUpdated) { + addAlertsToMetaAlert(metaAlert, Collections.singleton(alert)); + } + return metaAlertUpdated; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java new file mode 100644 index 0000000..f6bfeda --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.dao; + +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import org.apache.metron.indexing.dao.RetrieveLatestDao; +import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.index.query.IdsQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; + +public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao { + + private TransportClient transportClient; + + public ElasticsearchRetrieveLatestDao(TransportClient transportClient) { + this.transportClient = transportClient; + } + + @Override + public Document getLatest(String guid, String sensorType) { + Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit)); + return doc.orElse(null); + } + + @Override + public Iterable<Document> getAllLatest(List<GetRequest> getRequests) { + Collection<String> guids = new HashSet<>(); + Collection<String> sensorTypes = new HashSet<>(); + for (GetRequest getRequest : getRequests) { + guids.add(getRequest.getGuid()); + sensorTypes.add(getRequest.getSensorType()); + } + List<Document> documents = searchByGuids( + guids, + sensorTypes, + hit -> { + Long ts = 0L; + String doc = hit.getSourceAsString(); + String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null); + try { + return Optional.of(new Document(doc, hit.getId(), sourceType, ts)); + } catch (IOException e) { + throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); + } + } + + ); + return documents; + } + + <T> Optional<T> searchByGuid(String guid, String sensorType, + Function<SearchHit, Optional<T>> callback) { + Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null; + List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback); + if (results.size() > 0) { + return Optional.of(results.get(0)); + } else { + return Optional.empty(); + } + } + + /** + * Return the search hit based on the UUID and sensor type. + * A callback can be specified to transform the hit into a type T. + * If more than one hit happens, the first one will be returned. + */ + <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes, + Function<SearchHit, Optional<T>> callback) { + if (guids == null || guids.isEmpty()) { + return Collections.emptyList(); + } + QueryBuilder query = null; + IdsQueryBuilder idsQuery; + if (sensorTypes != null) { + String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc") + .toArray(String[]::new); + idsQuery = QueryBuilders.idsQuery(types); + } else { + idsQuery = QueryBuilders.idsQuery(); + } + + for (String guid : guids) { + query = idsQuery.addIds(guid); + } + + SearchRequestBuilder request = transportClient.prepareSearch() + .setQuery(query) + .setSize(guids.size()); + org.elasticsearch.action.search.SearchResponse response = request.get(); + SearchHits hits = response.getHits(); + List<T> results = new ArrayList<>(); + for (SearchHit hit : hits) { + Optional<T> result = callback.apply(hit); + if (result.isPresent()) { + results.add(result.get()); + } + } + return results; + } + + private Optional<Document> toDocument(final String guid, SearchHit hit) { + Long ts = 0L; + String doc = hit.getSourceAsString(); + String sourceType = toSourceType(hit.getType()); + try { + return Optional.of(new Document(doc, guid, sourceType, ts)); + } catch (IOException e) { + throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); + } + } + + /** + * Returns the source type based on a given doc type. + * @param docType The document type. + * @return The source type. + */ + private String toSourceType(String docType) { + return Iterables.getFirst(Splitter.on("_doc").split(docType), null); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java index 3971237..5725534 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java @@ -116,49 +116,6 @@ public class ElasticsearchSearchDao implements SearchDao { return group(groupRequest, new QueryStringQueryBuilder(groupRequest.getQuery())); } - @Override - public Document getLatest(String guid, String sensorType) throws IOException { - Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit)); - return doc.orElse(null); - } - - <T> Optional<T> searchByGuid(String guid, String sensorType, - Function<SearchHit, Optional<T>> callback) { - Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null; - List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback); - if (results.size() > 0) { - return Optional.of(results.get(0)); - } else { - return Optional.empty(); - } - } - - @Override - public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { - Collection<String> guids = new HashSet<>(); - Collection<String> sensorTypes = new HashSet<>(); - for (GetRequest getRequest: getRequests) { - guids.add(getRequest.getGuid()); - sensorTypes.add(getRequest.getSensorType()); - } - List<Document> documents = searchByGuids( - guids - , sensorTypes - , hit -> { - Long ts = 0L; - String doc = hit.getSourceAsString(); - String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null); - try { - return Optional.of(new Document(doc, hit.getId(), sourceType, ts)); - } catch (IOException e) { - throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); - } - } - - ); - return documents; - } - /** * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query. * @param request The request defining the parameters of the search @@ -505,63 +462,4 @@ public class ElasticsearchSearchDao implements SearchDao { } return searchResultGroups; } - - /** - * Return the search hit based on the UUID and sensor type. - * A callback can be specified to transform the hit into a type T. - * If more than one hit happens, the first one will be returned. - */ - <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes, - Function<SearchHit, Optional<T>> callback) { - if(guids == null || guids.isEmpty()) { - return Collections.EMPTY_LIST; - } - QueryBuilder query = null; - IdsQueryBuilder idsQuery = null; - if (sensorTypes != null) { - String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc").toArray(String[]::new); - idsQuery = QueryBuilders.idsQuery(types); - } else { - idsQuery = QueryBuilders.idsQuery(); - } - - for(String guid : guids) { - query = idsQuery.addIds(guid); - } - - SearchRequestBuilder request = client.prepareSearch() - .setQuery(query) - .setSize(guids.size()) - ; - org.elasticsearch.action.search.SearchResponse response = request.get(); - SearchHits hits = response.getHits(); - List<T> results = new ArrayList<>(); - for (SearchHit hit : hits) { - Optional<T> result = callback.apply(hit); - if (result.isPresent()) { - results.add(result.get()); - } - } - return results; - } - - private Optional<Document> toDocument(final String guid, SearchHit hit) { - Long ts = 0L; - String doc = hit.getSourceAsString(); - String sourceType = toSourceType(hit.getType()); - try { - return Optional.of(new Document(doc, guid, sourceType, ts)); - } catch (IOException e) { - throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); - } - } - - /** - * Returns the source type based on a given doc type. - * @param docType The document type. - * @return The source type. - */ - private String toSourceType(String docType) { - return Iterables.getFirst(Splitter.on("_doc").split(docType), null); - } } http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java index a7c3a71..c4d7412 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java @@ -42,14 +42,14 @@ public class ElasticsearchUpdateDao implements UpdateDao { private transient TransportClient client; private AccessConfig accessConfig; - private ElasticsearchSearchDao searchDao; + private ElasticsearchRetrieveLatestDao retrieveLatestDao; public ElasticsearchUpdateDao(TransportClient client, AccessConfig accessConfig, - ElasticsearchSearchDao searchDao) { + ElasticsearchRetrieveLatestDao searchDao) { this.client = client; this.accessConfig = accessConfig; - this.searchDao = searchDao; + this.retrieveLatestDao = searchDao; } @Override @@ -110,7 +110,7 @@ public class ElasticsearchUpdateDao implements UpdateDao { } protected Optional<String> getIndexName(String guid, String sensorType) { - return searchDao.searchByGuid(guid, + return retrieveLatestDao.searchByGuid(guid, sensorType, hit -> Optional.ofNullable(hit.getIndex()) ); @@ -121,7 +121,7 @@ public class ElasticsearchUpdateDao implements UpdateDao { Object ts = update.getTimestamp(); IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid()) .source(update.getDocument()); - if(ts != null) { + if (ts != null) { indexRequest = indexRequest.timestamp(ts.toString()); }