ATLAS-1947: AtlasSearchResult to include referredEntity headers
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/bcec42e3 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/bcec42e3 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/bcec42e3 Branch: refs/heads/feature-odf Commit: bcec42e3306c9517c1ded5e7ed538c76cfd29c33 Parents: 0d8f9f8 Author: apoorvnaik <apoorvn...@apache.org> Authored: Thu Jul 13 09:03:06 2017 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Fri Jul 14 16:02:38 2017 -0700 ---------------------------------------------------------------------- .../model/discovery/AtlasSearchResult.java | 78 ++- .../atlas/model/discovery/SearchParameters.java | 34 +- .../atlas/discovery/AtlasDiscoveryService.java | 2 +- .../ClassificationSearchProcessor.java | 198 ++++++ .../atlas/discovery/EntityDiscoveryService.java | 138 +++-- .../atlas/discovery/EntitySearchProcessor.java | 203 ++++++ .../discovery/FullTextSearchProcessor.java | 110 ++++ .../org/apache/atlas/discovery/GremlinStep.java | 389 ------------ .../apache/atlas/discovery/SearchContext.java | 126 ++++ .../apache/atlas/discovery/SearchPipeline.java | 611 ------------------- .../apache/atlas/discovery/SearchProcessor.java | 381 ++++++++++++ .../org/apache/atlas/discovery/SolrStep.java | 288 --------- .../store/graph/v1/EntityGraphRetriever.java | 20 +- .../org/apache/atlas/util/SearchTracker.java | 16 +- .../test/java/org/apache/atlas/TestModules.java | 3 - .../atlas/web/resources/AdminResource.java | 4 +- .../apache/atlas/web/rest/DiscoveryREST.java | 2 +- 17 files changed, 1216 insertions(+), 1387 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/bcec42e3/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java index 9513dcb..5827440 100644 --- a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java +++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java @@ -31,6 +31,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE; @@ -40,14 +41,15 @@ import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONL @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) public class AtlasSearchResult implements Serializable { - private AtlasQueryType queryType; - private SearchParameters searchParameters; - private String queryText; - private String type; - private String classification; - private List<AtlasEntityHeader> entities; - private AttributeSearchResult attributes; - private List<AtlasFullTextResult> fullTextResult; + private AtlasQueryType queryType; + private SearchParameters searchParameters; + private String queryText; + private String type; + private String classification; + private List<AtlasEntityHeader> entities; + private AttributeSearchResult attributes; + private List<AtlasFullTextResult> fullTextResult; + private Map<String, AtlasEntityHeader> referredEntities; public AtlasSearchResult() {} @@ -62,6 +64,7 @@ public class AtlasSearchResult implements Serializable { setEntities(null); setAttributes(null); setFullTextResult(null); + setReferredEntities(null); } public AtlasSearchResult(SearchParameters searchParameters) { @@ -73,6 +76,7 @@ public class AtlasSearchResult implements Serializable { setEntities(null); setAttributes(null); setFullTextResult(null); + setReferredEntities(null); } } @@ -80,6 +84,14 @@ public class AtlasSearchResult implements Serializable { public void setQueryType(AtlasQueryType queryType) { this.queryType = queryType; } + public SearchParameters getSearchParameters() { + return searchParameters; + } + + public void setSearchParameters(SearchParameters searchParameters) { + this.searchParameters = searchParameters; + } + public String getQueryText() { return queryText; } public void setQueryText(String queryText) { this.queryText = queryText; } @@ -104,6 +116,17 @@ public class AtlasSearchResult implements Serializable { public void setFullTextResult(List<AtlasFullTextResult> fullTextResult) { this.fullTextResult = fullTextResult; } + public Map<String, AtlasEntityHeader> getReferredEntities() { + return referredEntities; + } + + public void setReferredEntities(Map<String, AtlasEntityHeader> referredEntities) { + this.referredEntities = referredEntities; + } + + @Override + public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult, referredEntities); } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -116,24 +139,8 @@ public class AtlasSearchResult implements Serializable { Objects.equals(classification, that.classification) && Objects.equals(entities, that.entities) && Objects.equals(attributes, that.attributes) && - Objects.equals(fullTextResult, that.fullTextResult); - } - - @Override - public int hashCode() { return Objects.hash(queryType, searchParameters, queryText, type, classification, entities, attributes, fullTextResult); } - - @Override - public String toString() { - return "AtlasSearchResult{" + - "queryType=" + queryType + - ", searchParameters='" + searchParameters + '\'' + - ", queryText='" + queryText + '\'' + - ", type=" + type + - ", classification=" + classification + - ", entities=" + entities + - ", attributes=" + attributes + - ", fullTextResult=" + fullTextResult + - '}'; + Objects.equals(fullTextResult, that.fullTextResult) && + Objects.equals(referredEntities, that.referredEntities); } public void addEntity(AtlasEntityHeader newEntity) { @@ -163,12 +170,19 @@ public class AtlasSearchResult implements Serializable { } } - public void setSearchParameters(SearchParameters searchParameters) { - this.searchParameters = searchParameters; - } - - public SearchParameters getSearchParameters() { - return searchParameters; + @Override + public String toString() { + return "AtlasSearchResult{" + + "queryType=" + queryType + + ", searchParameters='" + searchParameters + '\'' + + ", queryText='" + queryText + '\'' + + ", type=" + type + + ", classification=" + classification + + ", entities=" + entities + + ", attributes=" + attributes + + ", fullTextResult=" + fullTextResult + + ", referredEntities=" + referredEntities + + '}'; } public enum AtlasQueryType { DSL, FULL_TEXT, GREMLIN, BASIC, ATTRIBUTE } http://git-wip-us.apache.org/repos/asf/atlas/blob/bcec42e3/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java b/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java index 30855dc..972c11e 100644 --- a/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java +++ b/intg/src/main/java/org/apache/atlas/model/discovery/SearchParameters.java @@ -207,9 +207,12 @@ public class SearchParameters { return Objects.hash(query, typeName, classification, excludeDeletedEntities, limit, offset, entityFilters, tagFilters, attributes); } - @Override - public String toString() { - final StringBuilder sb = new StringBuilder("SearchParameters{"); + public StringBuilder toString(StringBuilder sb) { + if (sb == null) { + sb = new StringBuilder(); + } + + sb.append('{'); sb.append("query='").append(query).append('\''); sb.append(", typeName='").append(typeName).append('\''); sb.append(", classification='").append(classification).append('\''); @@ -220,7 +223,13 @@ public class SearchParameters { sb.append(", tagFilters=").append(tagFilters); sb.append(", attributes=").append(attributes); sb.append('}'); - return sb.toString(); + + return sb; + } + + @Override + public String toString() { + return toString(new StringBuilder()).toString(); } @@ -297,16 +306,25 @@ public class SearchParameters { return Objects.hash(attributeName, operator, attributeValue, condition, criterion); } - @Override - public String toString() { - final StringBuilder sb = new StringBuilder("FilterCriteria{"); + public StringBuilder toString(StringBuilder sb) { + if (sb == null) { + sb = new StringBuilder(); + } + + sb.append('{'); sb.append("attributeName='").append(attributeName).append('\''); sb.append(", operator=").append(operator); sb.append(", attributeValue='").append(attributeValue).append('\''); sb.append(", condition=").append(condition); sb.append(", criterion=").append(criterion); sb.append('}'); - return sb.toString(); + + return sb; + } + + @Override + public String toString() { + return toString(new StringBuilder()).toString(); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java index 030a957..764b548 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java @@ -64,5 +64,5 @@ public interface AtlasDiscoveryService { * @return Matching entities * @throws AtlasBaseException */ - AtlasSearchResult searchUsingBasicQuery(SearchParameters searchParameters) throws AtlasBaseException; + AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException; } http://git-wip-us.apache.org/repos/asf/atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java new file mode 100644 index 0000000..77b2c7c --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.discovery; + +import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.*; +import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + + +public class ClassificationSearchProcessor extends SearchProcessor { + private static final Logger LOG = LoggerFactory.getLogger(ClassificationSearchProcessor.class); + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("ClassificationSearchProcessor"); + + private final AtlasIndexQuery indexQuery; + private final AtlasGraphQuery allGraphQuery; + private final AtlasGraphQuery filterGraphQuery; + + public ClassificationSearchProcessor(SearchContext context) { + super(context); + + AtlasClassificationType classificationType = context.getClassificationType(); + FilterCriteria filterCriteria = context.getSearchParameters().getTagFilters(); + Set<String> typeAndSubTypes = classificationType.getTypeAndAllSubTypes(); + Set<String> solrAttributes = new HashSet<>(); + Set<String> gremlinAttributes = new HashSet<>(); + Set<String> allAttributes = new HashSet<>(); + + + processSearchAttributes(classificationType, filterCriteria, solrAttributes, gremlinAttributes, allAttributes); + + // for classification search, if any attribute can't be handled by Solr - switch to all Gremlin + boolean useSolrSearch = typeAndSubTypes.size() <= MAX_CLASSIFICATION_TYPES_IN_INDEX_QUERY && CollectionUtils.isEmpty(gremlinAttributes) && canApplySolrFilter(classificationType, filterCriteria, false); + + if (useSolrSearch) { + StringBuilder solrQuery = new StringBuilder(); + + constructTypeTestQuery(solrQuery, typeAndSubTypes); + constructFilterQuery(solrQuery, classificationType, filterCriteria, solrAttributes); + + String solrQueryString = STRAY_AND_PATTERN.matcher(solrQuery).replaceAll(")"); + + solrQueryString = STRAY_OR_PATTERN.matcher(solrQueryString).replaceAll(")"); + solrQueryString = STRAY_ELIPSIS_PATTERN.matcher(solrQueryString).replaceAll(""); + + indexQuery = context.getGraph().indexQuery(Constants.VERTEX_INDEX, solrQueryString); + } else { + indexQuery = null; + } + + AtlasGraphQuery query = context.getGraph().query().in(Constants.TYPE_NAME_PROPERTY_KEY, typeAndSubTypes); + + allGraphQuery = toGremlinFilterQuery(classificationType, filterCriteria, allAttributes, query); + + query = context.getGraph().query().in(Constants.TRAIT_NAMES_PROPERTY_KEY, typeAndSubTypes); + + filterGraphQuery = query; // TODO: filer based on tag attributes + } + + @Override + public List<AtlasVertex> execute() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> ClassificationSearchProcessor.execute({})", context); + } + + List<AtlasVertex> ret = new ArrayList<>(); + + AtlasPerfTracer perf = null; + + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "ClassificationSearchProcessor.execute(" + context + ")"); + } + + try { + int qryOffset = (nextProcessor == null) ? context.getSearchParameters().getOffset() : 0; + int limit = context.getSearchParameters().getLimit(); + int resultIdx = qryOffset; + Set<String> processedGuids = new HashSet<>(); + + while (ret.size() < limit) { + if (context.terminateSearch()) { + LOG.warn("query terminated: {}", context.getSearchParameters()); + + break; + } + + List<AtlasVertex> classificationVertices; + + if (indexQuery != null) { + Iterator<AtlasIndexQuery.Result> queryResult = indexQuery.vertices(qryOffset, limit); + + if (!queryResult.hasNext()) { // no more results from solr - end of search + break; + } + + classificationVertices = getVerticesFromIndexQueryResult(queryResult); + } else { + Iterator<AtlasVertex> queryResult = allGraphQuery.vertices(qryOffset, limit).iterator(); + + if (!queryResult.hasNext()) { // no more results - end of search + break; + } + + classificationVertices = getVertices(queryResult); + } + + qryOffset += limit; + + List<AtlasVertex> entityVertices = new ArrayList<>(); + + for (AtlasVertex classificationVertex : classificationVertices) { + Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN); + + for (AtlasEdge edge : edges) { + AtlasVertex entityVertex = edge.getOutVertex(); + String guid = AtlasGraphUtilsV1.getIdFromVertex(entityVertex); + + if (!processedGuids.contains(guid)) { + if (!context.getSearchParameters().getExcludeDeletedEntities() || AtlasGraphUtilsV1.getState(entityVertex) == AtlasEntity.Status.ACTIVE) { + entityVertices.add(entityVertex); + } + + processedGuids.add(guid); + } + } + } + + entityVertices = super.filter(entityVertices); + + for (AtlasVertex entityVertex : entityVertices) { + resultIdx++; + + if (resultIdx < context.getSearchParameters().getOffset()) { + continue; + } + + ret.add(entityVertex); + + if (ret.size() == limit) { + break; + } + } + } + } finally { + AtlasPerfTracer.log(perf); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== ClassificationSearchProcessor.execute({}): ret.size()={}", context, ret.size()); + } + + return ret; + } + + @Override + public List<AtlasVertex> filter(List<AtlasVertex> entityVertices) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> ClassificationSearchProcessor.filter({})", entityVertices.size()); + } + + AtlasGraphQuery query = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(entityVertices)); + + query.addConditionsFrom(filterGraphQuery); + + List<AtlasVertex> ret = getVertices(query.vertices().iterator()); + + ret = super.filter(ret); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== ClassificationSearchProcessor.filter({}): ret.size()={}", entityVertices.size(), ret.size()); + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index 5068fa5..a4538bd 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -20,6 +20,7 @@ package org.apache.atlas.discovery; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy; import org.apache.atlas.exception.AtlasBaseException; @@ -29,8 +30,8 @@ import org.apache.atlas.model.discovery.AtlasSearchResult.AtlasQueryType; import org.apache.atlas.model.discovery.AtlasSearchResult.AttributeSearchResult; import org.apache.atlas.model.discovery.SearchParameters; import org.apache.atlas.model.instance.AtlasEntity.Status; -import org.apache.atlas.AtlasException; import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.query.Expressions.AliasExpression; import org.apache.atlas.query.Expressions.Expression; import org.apache.atlas.query.Expressions.SelectExpression; @@ -42,16 +43,16 @@ import org.apache.atlas.query.QueryProcessor; import org.apache.atlas.query.SelectExpressionHelper; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.MetadataRepository; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasIndexQuery; import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; -import org.apache.atlas.type.AtlasClassificationType; -import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.*; +import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; -import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.util.AtlasGremlinQueryProvider; import org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery; import org.apache.commons.collections.CollectionUtils; @@ -67,13 +68,7 @@ import scala.util.parsing.combinator.Parsers.NoSuccess; import javax.inject.Inject; import javax.script.ScriptEngine; import javax.script.ScriptException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import static org.apache.atlas.AtlasErrorCode.CLASSIFICATION_NOT_FOUND; import static org.apache.atlas.AtlasErrorCode.DISCOVERY_QUERY_FAILED; @@ -88,21 +83,20 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { private final EntityGraphRetriever entityRetriever; private final AtlasGremlinQueryProvider gremlinQueryProvider; private final AtlasTypeRegistry typeRegistry; - private final SearchPipeline searchPipeline; + private final GraphBackedSearchIndexer indexer; private final int maxResultSetSize; private final int maxTypesCountInIdxQuery; private final int maxTagsCountInIdxQuery; @Inject EntityDiscoveryService(MetadataRepository metadataRepository, AtlasTypeRegistry typeRegistry, - AtlasGraph graph, SearchPipeline searchPipeline) throws AtlasException { + AtlasGraph graph, GraphBackedSearchIndexer indexer) throws AtlasException { this.graph = graph; this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository); this.entityRetriever = new EntityGraphRetriever(typeRegistry); + this.indexer = indexer; this.gremlinQueryProvider = AtlasGremlinQueryProvider.INSTANCE; this.typeRegistry = typeRegistry; - this.searchPipeline = searchPipeline; - this.maxResultSetSize = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_RESULT_SET_SIZE, 150); this.maxTypesCountInIdxQuery = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_TYPES_COUNT, 10); this.maxTagsCountInIdxQuery = ApplicationProperties.get().getInt(Constants.INDEX_SEARCH_MAX_TAGS_COUNT, 10); @@ -404,20 +398,85 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { @Override @GraphTransaction - public AtlasSearchResult searchUsingBasicQuery(SearchParameters searchParameters) throws AtlasBaseException { + public AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException { AtlasSearchResult ret = new AtlasSearchResult(searchParameters); - List<AtlasVertex> resultList = searchPipeline.run(searchParameters); + SearchContext context = new SearchContext(searchParameters, typeRegistry, graph, indexer.getVertexIndexKeys()); + + List<AtlasVertex> resultList = context.getSearchProcessor().execute(); + + // By default any attribute that shows up in the search parameter should be sent back in the response + // If additional values are requested then the entityAttributes will be a superset of the all search attributes + // and the explicitly requested attribute(s) + Set<String> resultAttributes = new HashSet<>(); + Set<String> entityAttributes = new HashSet<>(); + + if (CollectionUtils.isNotEmpty(searchParameters.getAttributes())) { + resultAttributes.addAll(searchParameters.getAttributes()); + } + + for (String resultAttribute : resultAttributes) { + AtlasAttribute attribute = context.getEntityType().getAttribute(resultAttribute); + + if (attribute != null) { + AtlasType attributeType = attribute.getAttributeType(); + + if (attributeType instanceof AtlasArrayType) { + attributeType = ((AtlasArrayType) attributeType).getElementType(); + } + + if (attributeType instanceof AtlasEntityType || attributeType instanceof AtlasObjectIdType) { + entityAttributes.add(resultAttribute); + } + } + } for (AtlasVertex atlasVertex : resultList) { - AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(atlasVertex, searchParameters.getAttributes()); + AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(atlasVertex, resultAttributes); ret.addEntity(entity); + + // populate ret.referredEntities + for (String entityAttribute : entityAttributes) { + Object attrValue = entity.getAttribute(entityAttribute); + + if (attrValue instanceof AtlasObjectId) { + AtlasObjectId objId = (AtlasObjectId)attrValue; + + if (ret.getReferredEntities() == null) { + ret.setReferredEntities(new HashMap<String, AtlasEntityHeader>()); + } + + if (!ret.getReferredEntities().containsKey(objId.getGuid())) { + ret.getReferredEntities().put(objId.getGuid(), entityRetriever.toAtlasEntityHeader(objId.getGuid())); + } + } else if (attrValue instanceof Collection) { + Collection objIds = (Collection)attrValue; + + for (Object obj : objIds) { + if (obj instanceof AtlasObjectId) { + AtlasObjectId objId = (AtlasObjectId)obj; + + if (ret.getReferredEntities() == null) { + ret.setReferredEntities(new HashMap<String, AtlasEntityHeader>()); + } + + if (!ret.getReferredEntities().containsKey(objId.getGuid())) { + ret.getReferredEntities().put(objId.getGuid(), entityRetriever.toAtlasEntityHeader(objId.getGuid())); + } + } + } + } + } } return ret; } + public int getMaxResultSetSize() { + return maxResultSetSize; + } + private String getQueryForFullTextSearch(String userKeyedString, String typeName, String classification) { String typeFilter = getTypeFilter(typeRegistry, typeName, maxTypesCountInIdxQuery); String classficationFilter = getClassificationFilter(typeRegistry, classification, maxTagsCountInIdxQuery); @@ -447,28 +506,6 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { return String.format("v.\"%s\":(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, queryText.toString()); } - private static String getClassificationFilter(AtlasTypeRegistry typeRegistry, String classificationName, int maxTypesCountInIdxQuery) { - AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(classificationName); - Set<String> typeAndSubTypes = classification != null ? classification.getTypeAndAllSubTypes() : null; - - if(CollectionUtils.isNotEmpty(typeAndSubTypes) && typeAndSubTypes.size() <= maxTypesCountInIdxQuery) { - return String.format("(%s)", StringUtils.join(typeAndSubTypes, " ")); - } - - return ""; - } - - private static String getTypeFilter(AtlasTypeRegistry typeRegistry, String typeName, int maxTypesCountInIdxQuery) { - AtlasEntityType type = typeRegistry.getEntityTypeByName(typeName); - Set<String> typeAndSubTypes = type != null ? type.getTypeAndAllSubTypes() : null; - - if(CollectionUtils.isNotEmpty(typeAndSubTypes) && typeAndSubTypes.size() <= maxTypesCountInIdxQuery) { - return String.format("(%s)", StringUtils.join(typeAndSubTypes, " ")); - } - - return ""; - } - private List<AtlasFullTextResult> getIndexQueryResults(AtlasIndexQuery query, QueryParams params, boolean excludeDeletedEntities) throws AtlasBaseException { List<AtlasFullTextResult> ret = new ArrayList<>(); Iterator<Result> iter = query.vertices(); @@ -570,8 +607,25 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { return excludeDeletedEntities && GraphHelper.getStatus(vertex) == Status.DELETED; } - public int getMaxResultSetSize() { - return maxResultSetSize; + private static String getClassificationFilter(AtlasTypeRegistry typeRegistry, String classificationName, int maxTypesCountInIdxQuery) { + AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(classificationName); + Set<String> typeAndSubTypes = classification != null ? classification.getTypeAndAllSubTypes() : null; + + if(CollectionUtils.isNotEmpty(typeAndSubTypes) && typeAndSubTypes.size() <= maxTypesCountInIdxQuery) { + return String.format("(%s)", StringUtils.join(typeAndSubTypes, " ")); + } + + return ""; } + private static String getTypeFilter(AtlasTypeRegistry typeRegistry, String typeName, int maxTypesCountInIdxQuery) { + AtlasEntityType type = typeRegistry.getEntityTypeByName(typeName); + Set<String> typeAndSubTypes = type != null ? type.getTypeAndAllSubTypes() : null; + + if(CollectionUtils.isNotEmpty(typeAndSubTypes) && typeAndSubTypes.size() <= maxTypesCountInIdxQuery) { + return String.format("(%s)", StringUtils.join(typeAndSubTypes, " ")); + } + + return ""; + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java new file mode 100644 index 0000000..605cb15 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.discovery; + +import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.*; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public class EntitySearchProcessor extends SearchProcessor { + private static final Logger LOG = LoggerFactory.getLogger(EntitySearchProcessor.class); + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("EntitySearchProcessor"); + + private final AtlasIndexQuery indexQuery; + private final AtlasGraphQuery partialGraphQuery; + private final AtlasGraphQuery allGraphQuery; + + public EntitySearchProcessor(SearchContext context) { + super(context); + + AtlasEntityType entityType = context.getEntityType(); + AtlasClassificationType classificationType = context.getClassificationType(); + FilterCriteria filterCriteria = context.getSearchParameters().getEntityFilters(); + Set<String> typeAndSubTypes = entityType.getTypeAndAllSubTypes(); + Set<String> solrAttributes = new HashSet<>(); + Set<String> gremlinAttributes = new HashSet<>(); + Set<String> allAttributes = new HashSet<>(); + + + processSearchAttributes(entityType, filterCriteria, solrAttributes, gremlinAttributes, allAttributes); + + boolean useSolrSearch = typeAndSubTypes.size() <= MAX_ENTITY_TYPES_IN_INDEX_QUERY && canApplySolrFilter(entityType, filterCriteria, false); + + if (useSolrSearch) { + StringBuilder solrQuery = new StringBuilder(); + + constructTypeTestQuery(solrQuery, typeAndSubTypes); + constructFilterQuery(solrQuery, entityType, filterCriteria, solrAttributes); + + String solrQueryString = STRAY_AND_PATTERN.matcher(solrQuery).replaceAll(")"); + + solrQueryString = STRAY_OR_PATTERN.matcher(solrQueryString).replaceAll(")"); + solrQueryString = STRAY_ELIPSIS_PATTERN.matcher(solrQueryString).replaceAll(""); + + indexQuery = context.getGraph().indexQuery(Constants.VERTEX_INDEX, solrQueryString); + + if (CollectionUtils.isNotEmpty(gremlinAttributes) || classificationType != null) { + AtlasGraphQuery query = context.getGraph().query(); + + addClassificationNameConditionIfNecessary(query); + + partialGraphQuery = toGremlinFilterQuery(entityType, filterCriteria, gremlinAttributes, query); + } else { + partialGraphQuery = null; + } + } else { + indexQuery = null; + partialGraphQuery = null; + } + + AtlasGraphQuery query = context.getGraph().query().in(Constants.TYPE_NAME_PROPERTY_KEY, typeAndSubTypes); + + addClassificationNameConditionIfNecessary(query); + + allGraphQuery = toGremlinFilterQuery(entityType, filterCriteria, allAttributes, query); + + if (context.getSearchParameters().getExcludeDeletedEntities()) { + allGraphQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE"); + } + } + + @Override + public List<AtlasVertex> execute() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> EntitySearchProcessor.execute({})", context); + } + + List<AtlasVertex> ret = new ArrayList<>(); + + AtlasPerfTracer perf = null; + + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntitySearchProcessor.execute(" + context + ")"); + } + + try { + int qryOffset = (nextProcessor == null) ? context.getSearchParameters().getOffset() : 0; + int limit = context.getSearchParameters().getLimit(); + int resultIdx = qryOffset; + + while (ret.size() < limit) { + if (context.terminateSearch()) { + LOG.warn("query terminated: {}", context.getSearchParameters()); + + break; + } + + List<AtlasVertex> vertices; + + if (indexQuery != null) { + Iterator<AtlasIndexQuery.Result> queryResult = indexQuery.vertices(qryOffset, limit); + + if (!queryResult.hasNext()) { // no more results from solr - end of search + break; + } + + vertices = getVerticesFromIndexQueryResult(queryResult); + + if (partialGraphQuery != null) { + AtlasGraphQuery guidQuery = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(vertices)); + + guidQuery.addConditionsFrom(partialGraphQuery); + + vertices = getVertices(guidQuery.vertices().iterator()); + } + } else { + Iterator<AtlasVertex> queryResult = allGraphQuery.vertices(qryOffset, limit).iterator(); + + if (!queryResult.hasNext()) { // no more results from query - end of search + break; + } + + vertices = getVertices(queryResult); + } + + qryOffset += limit; + + vertices = super.filter(vertices); + + for (AtlasVertex vertex : vertices) { + resultIdx++; + + if (resultIdx < context.getSearchParameters().getOffset()) { + continue; + } + + ret.add(vertex); + + if (ret.size() == limit) { + break; + } + } + } + } finally { + AtlasPerfTracer.log(perf); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== EntitySearchProcessor.execute({}): ret.size()={}", context, ret.size()); + } + + return ret; + } + + @Override + public List<AtlasVertex> filter(List<AtlasVertex> entityVertices) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> EntitySearchProcessor.filter({})", entityVertices.size()); + } + + AtlasGraphQuery query = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(entityVertices)); + + query.addConditionsFrom(allGraphQuery); + + List<AtlasVertex> ret = getVertices(query.vertices().iterator()); + + ret = super.filter(ret); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== EntitySearchProcessor.filter({}): ret.size()={}", entityVertices.size(), ret.size()); + } + + return ret; + } + + private void addClassificationNameConditionIfNecessary(AtlasGraphQuery query) { + if (context.getClassificationType() != null && !context.needClassificationProcessor()) { + query.in(Constants.TRAIT_NAMES_PROPERTY_KEY, context.getClassificationType().getTypeAndAllSubTypes()); + } + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java new file mode 100644 index 0000000..4ddd642 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.discovery; + +import org.apache.atlas.model.discovery.SearchParameters; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasIndexQuery; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.utils.AtlasPerfTracer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + + +public class FullTextSearchProcessor extends SearchProcessor { + private static final Logger LOG = LoggerFactory.getLogger(FullTextSearchProcessor.class); + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("FullTextSearchProcessor"); + + private final AtlasIndexQuery indexQuery; + + public FullTextSearchProcessor(SearchContext context) { + super(context); + + SearchParameters searchParameters = context.getSearchParameters(); + String queryString = String.format("v.\"%s\":(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, searchParameters.getQuery()); + + indexQuery = context.getGraph().indexQuery(Constants.FULLTEXT_INDEX, queryString); + } + + @Override + public List<AtlasVertex> execute() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> FullTextSearchProcessor.execute({})", context); + } + + List<AtlasVertex> ret = new ArrayList<>(); + + AtlasPerfTracer perf = null; + + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "FullTextSearchProcessor.execute(" + context + ")"); + } + + try { + int qryOffset = nextProcessor == null ? context.getSearchParameters().getOffset() : 0; + int limit = context.getSearchParameters().getLimit(); + int resultIdx = qryOffset; + + while (ret.size() < limit) { + if (context.terminateSearch()) { + LOG.warn("query terminated: {}", context.getSearchParameters()); + + break; + } + + Iterator<AtlasIndexQuery.Result> idxQueryResult = indexQuery.vertices(qryOffset, limit); + + if (!idxQueryResult.hasNext()) { // no more results from solr - end of search + break; + } + + qryOffset += limit; + + List<AtlasVertex> vertices = getVerticesFromIndexQueryResult(idxQueryResult); + + vertices = super.filter(vertices); + + for (AtlasVertex vertex : vertices) { + resultIdx++; + + if (resultIdx < context.getSearchParameters().getOffset()) { + continue; + } + + ret.add(vertex); + + if (ret.size() == limit) { + break; + } + } + } + } finally { + AtlasPerfTracer.log(perf); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== FullTextSearchProcessor.execute({}): ret.size()={}", context, ret.size()); + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java b/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java deleted file mode 100644 index 1056b3e..0000000 --- a/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java +++ /dev/null @@ -1,389 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.discovery; - -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.discovery.SearchParameters; -import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria; -import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria.Condition; -import org.apache.atlas.model.discovery.SearchParameters.Operator; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graphdb.AtlasEdge; -import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; -import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.graphdb.AtlasGraphQuery; -import org.apache.atlas.repository.graphdb.AtlasIndexQuery; -import org.apache.atlas.repository.graphdb.AtlasVertex; -import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1; -import org.apache.atlas.type.AtlasClassificationType; -import org.apache.atlas.type.AtlasEntityType; -import org.apache.atlas.type.AtlasStructType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.utils.AtlasPerfTracer; -import org.apache.commons.collections.CollectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -import javax.inject.Inject; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - -import static org.apache.atlas.discovery.SearchPipeline.IndexResultType; -import static org.apache.atlas.discovery.SearchPipeline.PipelineContext; -import static org.apache.atlas.discovery.SearchPipeline.PipelineStep; -import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator; -import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.MatchingOperator; - -@Component -public class GremlinStep implements PipelineStep { - private static final Logger LOG = LoggerFactory.getLogger(GremlinStep.class); - private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("GremlinSearchStep"); - - private final AtlasGraph graph; - private final AtlasTypeRegistry typeRegistry; - - enum GremlinFilterQueryType { TAG, ENTITY } - - @Inject - public GremlinStep(AtlasGraph graph, AtlasTypeRegistry typeRegistry) { - this.graph = graph; - this.typeRegistry = typeRegistry; - } - - @Override - public void execute(PipelineContext context) throws AtlasBaseException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> GremlinStep.execute({})", context); - } - - if (context == null) { - throw new AtlasBaseException("Can't start search without any context"); - } - - AtlasPerfTracer perf = null; - - if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { - perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "GremlinSearchStep.execute(" + context + ")"); - } - - final Iterator<AtlasVertex> result; - - if (context.hasIndexResults()) { - // We have some results from the indexed step, let's proceed accordingly - if (context.getIndexResultType() == IndexResultType.TAG) { - // Index search was done on tag and filters - if (context.isTagProcessingComplete()) { - LOG.debug("GremlinStep.execute(): index has completely processed tag, further TAG filtering not needed"); - - Set<String> taggedVertexGUIDs = new HashSet<>(); - - Iterator<AtlasIndexQuery.Result> tagVertexIterator = context.getIndexResultsIterator(); - - while (tagVertexIterator.hasNext()) { - // Find out which Vertex has this outgoing edge - AtlasVertex vertex = tagVertexIterator.next().getVertex(); - Iterable<AtlasEdge> edges = vertex.getEdges(AtlasEdgeDirection.IN); - - for (AtlasEdge edge : edges) { - String guid = AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex()); - - taggedVertexGUIDs.add(guid); - } - } - - // No entities are tagged (actually this check is already done) - if (!taggedVertexGUIDs.isEmpty()) { - result = processEntity(taggedVertexGUIDs, context); - } else { - result = null; - } - } else { - result = processTagAndEntity(Collections.<String>emptySet(), context); - } - } else if (context.getIndexResultType() == IndexResultType.TEXT) { - // Index step processed full-text; - Set<String> entityIDs = getVertexIDs(context.getIndexResultsIterator()); - - result = processTagAndEntity(entityIDs, context); - } else if (context.getIndexResultType() == IndexResultType.ENTITY) { - // Index step processed entity and it's filters; tag filter wouldn't be set - Set<String> entityIDs = getVertexIDs(context.getIndexResultsIterator()); - - result = processEntity(entityIDs, context); - } else { - result = null; - } - } else { - // No index results, need full processing in Gremlin - if (context.getClassificationType() != null) { - // Process tag and filters first, then entity filters - result = processTagAndEntity(Collections.<String>emptySet(), context); - } else { - result = processEntity(Collections.<String>emptySet(), context); - } - } - - context.setGremlinResultIterator(result); - - AtlasPerfTracer.log(perf); - - if (LOG.isDebugEnabled()) { - LOG.debug("<== GremlinStep.execute({})", context); - } - } - - private Iterator<AtlasVertex> processEntity(Set<String> entityGUIDs, PipelineContext context) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> GremlinStep.processEntity(entityGUIDs={})", entityGUIDs); - } - - final Iterator<AtlasVertex> ret; - - SearchParameters searchParameters = context.getSearchParameters(); - AtlasEntityType entityType = context.getEntityType(); - - if (entityType != null) { - AtlasGraphQuery entityFilterQuery = context.getGraphQuery("ENTITY_FILTER"); - - if (entityFilterQuery == null) { - entityFilterQuery = graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, entityType.getTypeAndAllSubTypes()); - - if (searchParameters.getEntityFilters() != null) { - toGremlinFilterQuery(GremlinFilterQueryType.ENTITY, entityType, searchParameters.getEntityFilters(), entityFilterQuery, context); - } - - if (searchParameters.getExcludeDeletedEntities()) { - entityFilterQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE"); - } - - context.cacheGraphQuery("ENTITY_FILTER", entityFilterQuery); - } - - // Now get all vertices - if (CollectionUtils.isEmpty(entityGUIDs)) { - ret = entityFilterQuery.vertices(context.getCurrentOffset(), context.getMaxLimit()).iterator(); - } else { - AtlasGraphQuery guidQuery = graph.query().in(Constants.GUID_PROPERTY_KEY, entityGUIDs); - - if (entityFilterQuery != null) { - guidQuery.addConditionsFrom(entityFilterQuery); - } else if (searchParameters.getExcludeDeletedEntities()) { - guidQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE"); - } - - ret = guidQuery.vertices(context.getMaxLimit()).iterator(); - } - } else if (CollectionUtils.isNotEmpty(entityGUIDs)) { - AtlasGraphQuery guidQuery = graph.query().in(Constants.GUID_PROPERTY_KEY, entityGUIDs); - - if (searchParameters.getExcludeDeletedEntities()) { - guidQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE"); - } - - Iterable<AtlasVertex> vertices = guidQuery.vertices(context.getMaxLimit()); - - ret = vertices.iterator(); - } else { - ret = null; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== GremlinStep.processEntity(entityGUIDs={})", entityGUIDs); - } - - return ret; - } - - private Iterator<AtlasVertex> processTagAndEntity(Set<String> entityGUIDs, PipelineContext context) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> GremlinStep.processTagAndEntity(entityGUIDs={})", entityGUIDs); - } - - final Iterator<AtlasVertex> ret; - - AtlasClassificationType classificationType = context.getClassificationType(); - - if (classificationType != null) { - AtlasGraphQuery tagVertexQuery = context.getGraphQuery("TAG_VERTEX"); - - if (tagVertexQuery == null) { - tagVertexQuery = graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, classificationType.getTypeAndAllSubTypes()); - - SearchParameters searchParameters = context.getSearchParameters(); - - // Do tag filtering first as it'll return a smaller subset of vertices - if (searchParameters.getTagFilters() != null) { - toGremlinFilterQuery(GremlinFilterQueryType.TAG, classificationType, searchParameters.getTagFilters(), tagVertexQuery, context); - } - - context.cacheGraphQuery("TAG_VERTEX", tagVertexQuery); - } - - if (tagVertexQuery != null) { - Set<String> taggedVertexGuids = new HashSet<>(); - // Now get all vertices after adjusting offset for each iteration - LOG.debug("Firing TAG query"); - - Iterator<AtlasVertex> tagVertexIterator = tagVertexQuery.vertices(context.getCurrentOffset(), context.getMaxLimit()).iterator(); - - while (tagVertexIterator.hasNext()) { - // Find out which Vertex has this outgoing edge - Iterable<AtlasEdge> edges = tagVertexIterator.next().getEdges(AtlasEdgeDirection.IN); - for (AtlasEdge edge : edges) { - String guid = AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex()); - taggedVertexGuids.add(guid); - } - } - - entityGUIDs = taggedVertexGuids; - } - } - - if (!entityGUIDs.isEmpty()) { - ret = processEntity(entityGUIDs, context); - } else { - ret = null; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== GremlinStep.processTagAndEntity(entityGUIDs={})", entityGUIDs); - } - - return ret; - } - - private Set<String> getVertexIDs(Iterator<AtlasIndexQuery.Result> idxResultsIterator) { - Set<String> guids = new HashSet<>(); - while (idxResultsIterator.hasNext()) { - AtlasVertex vertex = idxResultsIterator.next().getVertex(); - String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex); - guids.add(guid); - } - return guids; - } - - private Set<String> getVertexIDs(Iterable<AtlasVertex> vertices) { - Set<String> guids = new HashSet<>(); - for (AtlasVertex vertex : vertices) { - String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex); - guids.add(guid); - } - return guids; - } - - private AtlasGraphQuery toGremlinFilterQuery(GremlinFilterQueryType queryType, AtlasStructType type, FilterCriteria criteria, - AtlasGraphQuery query, PipelineContext context) { - if (criteria.getCondition() != null) { - if (criteria.getCondition() == Condition.AND) { - for (FilterCriteria filterCriteria : criteria.getCriterion()) { - AtlasGraphQuery nestedQuery = toGremlinFilterQuery(queryType, type, filterCriteria, graph.query(), context); - query.addConditionsFrom(nestedQuery); - } - } else { - List<AtlasGraphQuery> orConditions = new LinkedList<>(); - - for (FilterCriteria filterCriteria : criteria.getCriterion()) { - AtlasGraphQuery nestedQuery = toGremlinFilterQuery(queryType, type, filterCriteria, graph.query(), context); - // FIXME: Something might not be right here as the queries are getting overwritten sometimes - orConditions.add(graph.query().createChildQuery().addConditionsFrom(nestedQuery)); - } - - if (!orConditions.isEmpty()) { - query.or(orConditions); - } - } - } else { - String attrName = criteria.getAttributeName(); - String attrValue = criteria.getAttributeValue(); - Operator operator = criteria.getOperator(); - - try { - // If attribute belongs to supertype then adjust the name accordingly - final String qualifiedAttributeName; - final boolean attrProcessed; - - if (queryType == GremlinFilterQueryType.TAG) { - qualifiedAttributeName = type.getQualifiedAttributeName(attrName); - attrProcessed = context.hasProcessedTagAttribute(qualifiedAttributeName); - } else { - qualifiedAttributeName = type.getQualifiedAttributeName(attrName); - attrProcessed = context.hasProcessedEntityAttribute(qualifiedAttributeName); - } - - // Check if the qualifiedAttribute has been processed - if (!attrProcessed) { - switch (operator) { - case LT: - query.has(qualifiedAttributeName, ComparisionOperator.LESS_THAN, attrValue); - break; - case LTE: - query.has(qualifiedAttributeName, ComparisionOperator.LESS_THAN_EQUAL, attrValue); - break; - case GT: - query.has(qualifiedAttributeName, ComparisionOperator.GREATER_THAN, attrValue); - break; - case GTE: - query.has(qualifiedAttributeName, ComparisionOperator.GREATER_THAN_EQUAL, attrValue); - break; - case EQ: - query.has(qualifiedAttributeName, ComparisionOperator.EQUAL, attrValue); - break; - case NEQ: - query.has(qualifiedAttributeName, ComparisionOperator.NOT_EQUAL, attrValue); - break; - case LIKE: - // TODO: Maybe we need to validate pattern - query.has(qualifiedAttributeName, MatchingOperator.REGEX, getLikeRegex(attrValue)); - break; - case CONTAINS: - query.has(qualifiedAttributeName, MatchingOperator.REGEX, getContainsRegex(attrValue)); - break; - case STARTS_WITH: - query.has(qualifiedAttributeName, MatchingOperator.PREFIX, attrValue); - break; - case ENDS_WITH: - query.has(qualifiedAttributeName, MatchingOperator.REGEX, getSuffixRegex(attrValue)); - break; - case IN: - LOG.warn("{}: unsupported operator. Ignored", operator); - break; - } - } - } catch (AtlasBaseException e) { - LOG.error("toGremlinFilterQuery(): failed for attrName=" + attrName + "; operator=" + operator + "; attrValue=" + attrValue, e); - } - } - - return query; - } - - private String getContainsRegex(String attributeValue) { - return ".*" + attributeValue + ".*"; - } - - private String getSuffixRegex(String attributeValue) { - return ".*" + attributeValue; - } - - private String getLikeRegex(String attributeValue) { return ".*" + attributeValue + ".*"; } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/bcec42e3/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java b/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java new file mode 100644 index 0000000..2125d61 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/SearchContext.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.discovery; + + +import org.apache.atlas.model.discovery.SearchParameters; +import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.Set; + + +public class SearchContext { + private final SearchParameters searchParameters; + private final AtlasTypeRegistry typeRegistry; + private final AtlasGraph graph; + private final Set<String> indexedKeys; + private final AtlasEntityType entityType; + private final AtlasClassificationType classificationType; + private SearchProcessor searchProcessor; + private boolean terminateSearch = false; + + public SearchContext(SearchParameters searchParameters, AtlasTypeRegistry typeRegistry, AtlasGraph graph, Set<String> indexedKeys) { + this.searchParameters = searchParameters; + this.typeRegistry = typeRegistry; + this.graph = graph; + this.indexedKeys = indexedKeys; + this.entityType = typeRegistry.getEntityTypeByName(searchParameters.getTypeName()); + this.classificationType = typeRegistry.getClassificationTypeByName(searchParameters.getClassification()); + + if (needFullTextrocessor()) { + addProcessor(new FullTextSearchProcessor(this)); + } + + if (needClassificationProcessor()) { + addProcessor(new ClassificationSearchProcessor(this)); + } + + if (needEntityProcessor()) { + addProcessor(new EntitySearchProcessor(this)); + + } + } + + public SearchParameters getSearchParameters() { return searchParameters; } + + public AtlasTypeRegistry getTypeRegistry() { return typeRegistry; } + + public AtlasGraph getGraph() { return graph; } + + public Set<String> getIndexedKeys() { return indexedKeys; } + + public AtlasEntityType getEntityType() { return entityType; } + + public AtlasClassificationType getClassificationType() { return classificationType; } + + public SearchProcessor getSearchProcessor() { return searchProcessor; } + + public boolean terminateSearch() { return this.terminateSearch; } + + public void terminateSearch(boolean terminateSearch) { this.terminateSearch = terminateSearch; } + + public StringBuilder toString(StringBuilder sb) { + if (sb == null) { + sb = new StringBuilder(); + } + + sb.append("searchParameters="); + + if (searchParameters != null) { + searchParameters.toString(sb); + } + + return sb; + } + + @Override + public String toString() { + return toString(new StringBuilder()).toString(); + } + + public boolean needFullTextrocessor() { + return StringUtils.isNotEmpty(searchParameters.getQuery()); + } + + public boolean needClassificationProcessor() { + return classificationType != null && (hasAttributeFilter(searchParameters.getTagFilters()) || entityType == null); + } + + public boolean needEntityProcessor() { + return entityType != null; + } + + private boolean hasAttributeFilter(FilterCriteria filterCriteria) { + return filterCriteria != null && + (CollectionUtils.isNotEmpty(filterCriteria.getCriterion()) || StringUtils.isNotEmpty(filterCriteria.getAttributeName())); + } + + private void addProcessor(SearchProcessor processor) { + if (this.searchProcessor == null) { + this.searchProcessor = processor; + } else { + this.searchProcessor.addProcessor(processor); + } + } +}