Repository: incubator-atlas Updated Branches: refs/heads/master 68c559254 -> 6170063c8
ATLAS-1503: implementation of import REST API; optimization of export API implementation Signed-off-by: Madhan Neethiraj <mad...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/6170063c Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/6170063c Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/6170063c Branch: refs/heads/master Commit: 6170063c877870856e21ab9b6fa1bdf638860421 Parents: 68c5592 Author: ashutoshm <ames...@hortonworks.com> Authored: Wed Feb 15 09:06:05 2017 -0800 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Thu Feb 16 01:39:13 2017 -0800 ---------------------------------------------------------------------- .../atlas/repository/graphdb/AtlasGraph.java | 21 +- .../repository/graphdb/titan0/Titan0Graph.java | 16 +- .../atlas/model/impexp/AtlasExportResult.java | 27 +-- .../atlas/model/impexp/AtlasImportRequest.java | 76 +++++++ .../atlas/model/impexp/AtlasImportResult.java | 161 +++++++++++++++ .../store/graph/v1/AtlasEntityStoreV1.java | 11 +- .../store/graph/v1/EntityGraphMapper.java | 7 +- .../store/graph/v1/EntityImportStream.java | 21 ++ .../store/graph/v1/IDBasedEntityResolver.java | 10 +- .../atlas/web/resources/AdminResource.java | 96 ++++++++- .../atlas/web/resources/ExportService.java | 159 +++++++++------ .../atlas/web/resources/ImportService.java | 137 +++++++++++++ .../atlas/web/resources/ZipExportFileNames.java | 34 ++++ .../org/apache/atlas/web/resources/ZipSink.java | 17 +- .../apache/atlas/web/resources/ZipSource.java | 199 +++++++++++++++++++ .../org/apache/atlas/web/util/Servlets.java | 22 ++ .../atlas/web/resources/AdminResourceTest.java | 4 +- 17 files changed, 899 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java ---------------------------------------------------------------------- diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java index 13b9a11..f7e5c5c 100644 --- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java +++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java @@ -21,6 +21,9 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Set; +import javax.script.Bindings; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineManager; import javax.script.ScriptException; import org.apache.atlas.groovy.GroovyExpression; @@ -265,6 +268,22 @@ public interface AtlasGraph<V, E> { Object executeGremlinScript(String query, boolean isPath) throws ScriptException; /** + * Executes a Gremlin script using a ScriptEngineManager provided by consumer, returns an object with the result. + * This is useful for scenarios where an operation executes large number of queries. + * + * @param scriptEngine: ScriptEngine initialized by consumer. + * @param bindings: Update bindings with Graph instance for ScriptEngine that is initilized externally. + * @param query + * @param isPath whether this is a path query + * + * @return the result from executing the script + * + * @throws ScriptException + */ + Object executeGremlinScript(ScriptEngine scriptEngine, Bindings bindings, String query, boolean isPath) throws ScriptException; + + + /** * Convenience method to check whether the given property is * a multi-property. * @@ -272,6 +291,4 @@ public interface AtlasGraph<V, E> { * @return */ boolean isMultiProperty(String name); - - } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Graph.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Graph.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Graph.java index f1b38e1..75ea545 100644 --- a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Graph.java +++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Graph.java @@ -264,6 +264,10 @@ public class Titan0Graph implements AtlasGraph<Titan0Vertex, Titan0Edge> { public Object executeGremlinScript(String query, boolean isPath) throws ScriptException { Object result = executeGremlinScript(query); + return convertGremlinScriptResult(isPath, result); + } + + private Object convertGremlinScriptResult(boolean isPath, Object result) { if (isPath) { List<Object> path = convertPathQueryResultToList(result); @@ -277,6 +281,17 @@ public class Titan0Graph implements AtlasGraph<Titan0Vertex, Titan0Edge> { } } + @Override + public Object executeGremlinScript(ScriptEngine scriptEngine, Bindings bindings, String query, boolean isPath) throws ScriptException { + if(!bindings.containsKey("g")) { + bindings.put("g", getGraph()); + } + + Object result = scriptEngine.eval(query, bindings); + return convertGremlinScriptResult(isPath, result); + + } + private Object executeGremlinScript(String gremlinQuery) throws ScriptException { ScriptEngineManager manager = new ScriptEngineManager(); @@ -351,7 +366,6 @@ public class Titan0Graph implements AtlasGraph<Titan0Vertex, Titan0Edge> { }); } - @Override public boolean isMultiProperty(String propertyName) { return multiProperties.contains(propertyName); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java index d89dd36..e6a967e 100644 --- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java @@ -29,6 +29,7 @@ import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -187,15 +188,15 @@ public class AtlasExportResult implements Serializable { public static class AtlasExportData implements Serializable{ private static final long serialVersionUID = 1L; - private AtlasTypesDef typesDef; - private Map<String, AtlasEntity> entities; - private Map<String, List<String>> entityCreationOrder; + private AtlasTypesDef typesDef; + private Map<String, AtlasEntity> entities; + private List<String> entityCreationOrder; public AtlasExportData() { typesDef = new AtlasTypesDef(); entities = new HashMap<>(); - entityCreationOrder = new HashMap<>(); + entityCreationOrder = new ArrayList<>(); } public AtlasTypesDef getTypesDef() { return typesDef; } @@ -206,25 +207,11 @@ public class AtlasExportResult implements Serializable { public void setEntities(Map<String, AtlasEntity> entities) { this.entities = entities; } - public Map<String, List<String>> getEntityCreationOrder() { return entityCreationOrder; } + public List<String> getEntityCreationOrder() { return entityCreationOrder; } - public void setEntityCreationOrder(Map<String, List<String>> entityCreationOrder) { this.entityCreationOrder = entityCreationOrder; } + public void setEntityCreationOrder(List<String> entityCreationOrder) { this.entityCreationOrder = entityCreationOrder; } - public void clear() { - if (entityCreationOrder != null) { - entityCreationOrder.clear(); - } - - if (typesDef != null) { - typesDef.clear(); - } - - if (entities != null) { - entities.clear(); - } - } - public StringBuilder toString(StringBuilder sb) { if (sb == null) { sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java new file mode 100644 index 0000000..7530196 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.impexp; + +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE; +import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY; + + +@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE) +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown=true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.PROPERTY) +public class AtlasImportRequest implements Serializable { + private static final long serialVersionUID = 1L; + + private Map<String, Object> options; + + public AtlasImportRequest() { + this.options = new HashMap<>(); + } + + public AtlasImportRequest(Map<String, Object> options) { + this.options = options; + } + + public Map<String, Object> getOptions() { return options; } + + public void setOptions(Map<String, Object> options) { this.options = options; } + + public StringBuilder toString(StringBuilder sb) { + if (sb == null) { + sb = new StringBuilder(); + } + + sb.append("AtlasImportRequest{"); + sb.append("options={"); + AtlasBaseTypeDef.dumpObjects(options, sb); + sb.append("}"); + sb.append("}"); + + return sb; + } + + @Override + public String toString() { + return toString(new StringBuilder()).toString(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java new file mode 100644 index 0000000..a5eeef1 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.model.impexp; + +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +import java.util.HashMap; +import java.util.Map; + +import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE; +import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE) +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown=true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.PROPERTY) +public class AtlasImportResult { + private static final long serialVersionUID = 1L; + + public enum OperationStatus { + SUCCESS, PARTIAL_SUCCESS, FAIL + } + + private AtlasImportRequest request; + private String userName; + private String clientIpAddress; + private String hostName; + private long timeStamp; + private Map<String, Integer> metrics; + private OperationStatus operationStatus; + + public AtlasImportResult() { + this(null, null, null, null, System.currentTimeMillis()); + } + + public AtlasImportResult(AtlasImportRequest request, String userName, + String clientIpAddress, String hostName, long timeStamp) { + this.request = request; + this.userName = userName; + this.clientIpAddress = clientIpAddress; + this.hostName = hostName; + this.timeStamp = timeStamp; + this.metrics = new HashMap<>(); + this.operationStatus = OperationStatus.FAIL; + } + + public AtlasImportRequest getRequest() { + return request; + } + + public void setRequest(AtlasImportRequest request) { + this.request = request; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getClientIpAddress() { + return clientIpAddress; + } + + public void setClientIpAddress(String clientIpAddress) { + this.clientIpAddress = clientIpAddress; + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public long getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + public Map<String, Integer> getMetrics() { + return metrics; + } + + public void setMetrics(Map<String, Integer> metrics) { + this.metrics = metrics; + } + + public OperationStatus getOperationStatus() { + return operationStatus; + } + + public void setOperationStatus(OperationStatus operationStatus) { + this.operationStatus = operationStatus; + } + + public void incrementMeticsCounter(String key) { + incrementMeticsCounter(key, 1); + } + + public void incrementMeticsCounter(String key, int incrementBy) { + int currentValue = metrics.containsKey(key) ? metrics.get(key) : 0; + + metrics.put(key, currentValue + incrementBy); + } + + public StringBuilder toString(StringBuilder sb) { + if (sb == null) { + sb = new StringBuilder(); + } + + sb.append("AtlasImportResult{"); + sb.append("request={").append(request).append("}"); + sb.append(", userName='").append(userName).append("'"); + sb.append(", clientIpAddress='").append(clientIpAddress).append("'"); + sb.append(", hostName='").append(hostName).append("'"); + sb.append(", timeStamp='").append(timeStamp).append("'"); + sb.append(", metrics={"); + AtlasBaseTypeDef.dumpObjects(metrics, sb); + sb.append("}"); + + sb.append(", operationStatus='").append(operationStatus).append("'"); + sb.append("}"); + + return sb; + } + + @Override + public String toString() { + return toString(new StringBuilder()).toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 5cb276c..93f7b86 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -21,9 +21,7 @@ package org.apache.atlas.repository.store.graph.v1; import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.AtlasException; import org.apache.atlas.GraphTransaction; -import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContextV1; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasClassification; @@ -33,7 +31,6 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutations; -import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.EntityGraphDiscovery; @@ -46,10 +43,8 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -303,7 +298,11 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); //Create vertices which do not exist in the repository - vertex = entityGraphMapper.createVertex(entity); + if ((entityStream instanceof EntityImportStream) && AtlasEntity.isAssigned(entity.getGuid())) { + vertex = entityGraphMapper.createVertexWithGuid(entity, entity.getGuid()); + } else { + vertex = entityGraphMapper.createVertex(entity); + } discoveryContext.addResolvedGuid(guid, vertex); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java index 397ee7e..2e71ab8 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphMapper.java @@ -84,6 +84,11 @@ public class EntityGraphMapper { } public AtlasVertex createVertex(AtlasEntity entity) { + final String guid = UUID.randomUUID().toString(); + return createVertexWithGuid(entity, guid); + } + + public AtlasVertex createVertexWithGuid(AtlasEntity entity, String guid) { if (LOG.isDebugEnabled()) { LOG.debug("==> createVertex({})", entity.getTypeName()); } @@ -96,8 +101,6 @@ public class EntityGraphMapper { AtlasGraphUtilsV1.addProperty(ret, Constants.SUPER_TYPES_PROPERTY_KEY, superTypeName); } - final String guid = UUID.randomUUID().toString(); - AtlasGraphUtilsV1.setProperty(ret, Constants.GUID_PROPERTY_KEY, guid); AtlasGraphUtilsV1.setProperty(ret, Constants.VERSION_PROPERTY_KEY, getEntityVersion(entity)); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java new file mode 100644 index 0000000..51ae312 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityImportStream.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + +public interface EntityImportStream extends EntityStream { +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java index d5946f2..e773f02 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/IDBasedEntityResolver.java @@ -21,7 +21,6 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext; import org.apache.atlas.repository.store.graph.EntityResolver; @@ -34,7 +33,6 @@ import org.slf4j.LoggerFactory; public class IDBasedEntityResolver implements EntityResolver { private static final Logger LOG = LoggerFactory.getLogger(IDBasedEntityResolver.class); - private final GraphHelper graphHelper = GraphHelper.getInstance(); private final AtlasTypeRegistry typeRegistry; public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry) { @@ -49,8 +47,8 @@ public class IDBasedEntityResolver implements EntityResolver { EntityStream entityStream = context.getEntityStream(); for (String guid : context.getReferencedGuids()) { - boolean isAssignedGuid = AtlasEntity.isAssigned(guid); - AtlasVertex vertex = isAssignedGuid ? AtlasGraphUtilsV1.findByGuid(guid) : null; + boolean isAssignedGuid = AtlasEntity.isAssigned(guid); + AtlasVertex vertex = isAssignedGuid ? AtlasGraphUtilsV1.findByGuid(guid) : null; if (vertex == null) { // if not found in the store, look if the entity is present in the stream AtlasEntity entity = entityStream.getByGuid(guid); @@ -71,7 +69,7 @@ public class IDBasedEntityResolver implements EntityResolver { if (vertex != null) { context.addResolvedGuid(guid, vertex); } else { - if (isAssignedGuid) { + if (isAssignedGuid && !(entityStream instanceof EntityImportStream)) { throw new AtlasBaseException(AtlasErrorCode.REFERENCED_ENTITY_NOT_FOUND, guid); } else { context.addLocalGuidReference(guid); @@ -81,4 +79,4 @@ public class IDBasedEntityResolver implements EntityResolver { return context; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index 440d75c..2c2c16d 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -21,15 +21,18 @@ package org.apache.atlas.web.resources; import com.google.inject.Inject; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; -import org.apache.atlas.discovery.DiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.metrics.AtlasMetrics; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.services.MetricsService; import org.apache.atlas.authorize.AtlasActionTypes; import org.apache.atlas.authorize.AtlasResourceTypes; import org.apache.atlas.authorize.simple.AtlasAuthorizationUtils; +import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter; import org.apache.atlas.web.service.ServiceState; @@ -53,6 +56,7 @@ import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.*; @@ -85,16 +89,19 @@ public class AdminResource { private final ServiceState serviceState; private final MetricsService metricsService; - private final DiscoveryService discoveryService; private final AtlasTypeRegistry typeRegistry; + private final AtlasTypeDefStore typesDefStore; + private final AtlasEntityStore entityStore; @Inject public AdminResource(ServiceState serviceState, MetricsService metricsService, - DiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) { + AtlasTypeRegistry typeRegistry, AtlasTypeDefStore typeDefStore, + AtlasEntityStore entityStore) { this.serviceState = serviceState; this.metricsService = metricsService; - this.discoveryService = discoveryService; this.typeRegistry = typeRegistry; + this.typesDefStore = typeDefStore; + this.entityStore = entityStore; } /** @@ -272,6 +279,10 @@ public class AdminResource { @Path("/export") @Consumes(Servlets.JSON_MEDIA_TYPE) public Response export(AtlasExportRequest request) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AdminResource.export()"); + } + ZipSink exportSink = null; try { exportSink = new ZipSink(); @@ -292,16 +303,85 @@ public class AdminResource { outStream.flush(); return Response.ok().build(); - } catch (AtlasException | IOException ex) { - LOG.error("export() failed", ex); + } catch (IOException excp) { + LOG.error("export() failed", excp); - throw toAtlasBaseException(new AtlasException(ex)); + throw new AtlasBaseException(excp); } finally { - if (exportSink != null) + if (exportSink != null) { exportSink.close(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== AdminResource.export()"); + } } } + @POST + @Path("/import") + @Produces(Servlets.JSON_MEDIA_TYPE) + @Consumes(Servlets.BINARY) + public AtlasImportResult importData(byte[] bytes) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AdminResource.importData(bytes.length={})", bytes.length); + } + + AtlasImportResult result; + + try { + AtlasImportRequest request = new AtlasImportRequest(Servlets.getParameterMap(httpServletRequest)); + ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); + ImportService importService = new ImportService(this.typesDefStore, this.entityStore); + + ZipSource zipSource = new ZipSource(inputStream); + + result = importService.run(zipSource, request, Servlets.getUserName(httpServletRequest), + Servlets.getHostName(httpServletRequest), + Servlets.getRequestIpAddress(httpServletRequest)); + } catch (Exception excp) { + LOG.error("importData(binary) failed", excp); + + throw new AtlasBaseException(excp); + } finally { + if (LOG.isDebugEnabled()) { + LOG.debug("<== AdminResource.importData(binary)"); + } + } + + return result; + } + + @POST + @Path("/importfile") + @Produces(Servlets.JSON_MEDIA_TYPE) + public AtlasImportResult importFile() throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> AdminResource.importFile()"); + } + + AtlasImportResult result; + + try { + AtlasImportRequest request = new AtlasImportRequest(Servlets.getParameterMap(httpServletRequest)); + ImportService importService = new ImportService(this.typesDefStore, this.entityStore); + + result = importService.run(request, Servlets.getUserName(httpServletRequest), + Servlets.getHostName(httpServletRequest), + Servlets.getRequestIpAddress(httpServletRequest)); + } catch (Exception excp) { + LOG.error("importFile() failed", excp); + + throw new AtlasBaseException(excp); + } finally { + if (LOG.isDebugEnabled()) { + LOG.debug("<== AdminResource.importFile()"); + } + } + + return result; + } + private String getEditableEntityTypes(PropertiesConfiguration config) { String ret = DEFAULT_EDITABLE_ENTITY_TYPES; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java index 4499b9c..7d3d442 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ExportService.java @@ -23,6 +23,7 @@ import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever; +import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; @@ -35,7 +36,7 @@ import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.script.ScriptException; +import javax.script.*; import java.util.*; @@ -46,18 +47,25 @@ public class ExportService { private final AtlasGraph atlasGraph; private final EntityGraphRetriever entityGraphRetriever; - public ExportService(final AtlasTypeRegistry typeRegistry) { + // query engine support + private ScriptEngineManager scriptEngineManager; + private ScriptEngine scriptEngine; + private Bindings bindings; + private final String gremlinQuery = "g.V('__guid', startGuid).bothE().bothV().has('__guid').__guid.dedup().toList()"; + public ExportService(final AtlasTypeRegistry typeRegistry) { this.typeRegistry = typeRegistry; this.entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry); this.atlasGraph = AtlasGraphProvider.getGraphInstance(); + + initScriptEngine(); } private class ExportContext { - final Map<String, Boolean> entitiesToBeProcessed = new HashMap<>(); - final AtlasExportResult result; - final ZipSink sink; - long numOfEntitiesExported = 0; + final Set<String> guidsProcessed = new HashSet<>(); + final List<String> guidsToProcess = new ArrayList<>(); + final AtlasExportResult result; + final ZipSink sink; ExportContext(AtlasExportResult result, ZipSink sink) { this.result = result; @@ -66,26 +74,24 @@ public class ExportService { } public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName, - String requestingIP) throws AtlasException { + String requestingIP) throws AtlasBaseException { - ExportContext context = new ExportContext(new AtlasExportResult(request, userName, hostName, requestingIP, System.currentTimeMillis()), exportSink); + ExportContext context = new ExportContext(new AtlasExportResult(request, userName, hostName, requestingIP, + System.currentTimeMillis()), exportSink); try { LOG.info("==> export(user={}, from={})", userName, requestingIP); - int i = 0; for (AtlasObjectId item : request.getItemsToExport()) { - process(Integer.toString(i++), item, context); + processObjectId(item, context); } context.sink.setExportOrder(context.result.getData().getEntityCreationOrder()); context.sink.setTypesDef(context.result.getData().getTypesDef()); - - context.result.getData().clear(); + context.result.setData(null); context.result.setOperationStatus(AtlasExportResult.OperationStatus.SUCCESS); context.sink.setResult(context.result); - } - catch(Exception ex) { + } catch(Exception ex) { LOG.error("Operation failed: ", ex); } finally { LOG.info("<== export(user={}, from={}): status {}", userName, requestingIP, context.result.getOperationStatus()); @@ -94,80 +100,86 @@ public class ExportService { return context.result; } - private void process(String folder, AtlasObjectId item, ExportContext context) throws AtlasServiceException, AtlasException, AtlasBaseException { + private void processObjectId(AtlasObjectId item, ExportContext context) throws AtlasServiceException, AtlasException, AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> processObjectId({})", item); + } + try { - AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item); - List<String> queue = populateConnectedEntities(entity.getGuid(), context); + AtlasEntity entity = entityGraphRetriever.toAtlasEntity(item); - process(entity, context); + processEntity(entity, context); - for (String guid : queue) { - if(context.entitiesToBeProcessed.get(guid)) { - continue; - } + while (!context.guidsToProcess.isEmpty()) { + String guid = context.guidsToProcess.remove(0); - process(entityGraphRetriever.toAtlasEntity(guid), context); - } + entity = entityGraphRetriever.toAtlasEntity(guid); - context.result.getData().getEntityCreationOrder().put(folder, queue); - } catch (AtlasBaseException e) { + processEntity(entity, context); + } + } catch (AtlasBaseException excp) { context.result.setOperationStatus(AtlasExportResult.OperationStatus.PARTIAL_SUCCESS); - LOG.error("Fetching entity failed for: {}", item); + LOG.error("Fetching entity failed for: {}", item, excp); } - } - private void process(AtlasEntity entity, ExportContext context) throws AtlasBaseException, AtlasException { - addTypesAsNeeded(entity.getTypeName(), context); - addClassificationsAsNeeded(entity, context); - addEntity(entity, context); + if (LOG.isDebugEnabled()) { + LOG.debug("<== processObjectId({})", item); + } } - private void addEntity(AtlasEntity entity, ExportContext context) throws AtlasException, AtlasBaseException { - context.entitiesToBeProcessed.put(entity.getGuid(), true); - context.sink.add(entity); + private void processEntity(AtlasEntity entity, ExportContext context) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> processEntity({})", entity.getAtlasObjectId()); + } - context.result.incrementMeticsCounter(String.format("entity:%s", entity.getTypeName())); - context.result.incrementMeticsCounter("Entities"); + if (!context.guidsProcessed.contains(entity.getGuid())) { + addTypesAsNeeded(entity.getTypeName(), context); + addClassificationsAsNeeded(entity, context); + addEntity(entity, context); - context.numOfEntitiesExported++; + context.guidsProcessed.add(entity.getGuid()); + context.result.getData().getEntityCreationOrder().add(entity.getGuid()); - if (context.numOfEntitiesExported % 10 == 0) { - LOG.info("export(): in progress.. number of entities exported: {}", context.numOfEntitiesExported); + getConnectedEntityGuids(entity, context); } - } - private List<String> populateConnectedEntities(String startGuid, ExportContext context) { - final String gremlinQuery = "g.V('__guid', '%s').bothE().bothV().has('__guid').__guid.toList()"; + if (LOG.isDebugEnabled()) { + LOG.debug("<== processEntity({})", entity.getAtlasObjectId()); + } + } - Map<String, Boolean> entitiesToBeProcessed = context.entitiesToBeProcessed; + private void getConnectedEntityGuids(AtlasEntity entity, ExportContext context) { - List<String> queue = new ArrayList<>(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("==> getConnectedEntityGuids({}): guidsToProcess {}", entity.getAtlasObjectId(), context.guidsToProcess.size()); + } - entitiesToBeProcessed.put(startGuid, false); - queue.add(startGuid); + List<String> result = executeGremlinScriptFor(entity.getGuid()); + for (String guid : result) { + if (!context.guidsProcessed.contains(guid)) { + context.guidsToProcess.add(guid); + } + } - for (int i=0; i < queue.size(); i++) { - String currentGuid = queue.get(i); + if (LOG.isDebugEnabled()) { + LOG.debug("<== getConnectedEntityGuids({}): found {} guids; guidsToProcess {}", entity.getGuid(), result.size(), context.guidsToProcess.size()); + } + } catch (ScriptException e) { + LOG.error("Child entities could not be added for %s", entity.getGuid()); + } + } - try { - List<String> result = (List<String>) atlasGraph.executeGremlinScript( - String.format(gremlinQuery, currentGuid), false); + private void addEntity(AtlasEntity entity, ExportContext context) throws AtlasBaseException { + context.sink.add(entity); - for (String guid : result) { - if (entitiesToBeProcessed.containsKey(guid)) { - continue; - } + context.result.incrementMeticsCounter(String.format("entity:%s", entity.getTypeName())); + context.result.incrementMeticsCounter("Entities"); - entitiesToBeProcessed.put(guid, false); - queue.add(guid); - } - } catch (ScriptException e) { - LOG.error("Child entities could not be added for %s", currentGuid); - } + if (context.guidsProcessed.size() % 10 == 0) { + LOG.info("export(): in progress.. number of entities exported: {}", context.guidsProcessed.size()); } - - return queue; } private void addClassificationsAsNeeded(AtlasEntity entity, ExportContext context) { @@ -199,4 +211,23 @@ public class ExportService { result.incrementMeticsCounter("Type(s)"); } } + + private List<String> executeGremlinScriptFor(String guid) throws ScriptException { + + bindings.put("startGuid", guid); + return (List<String>) atlasGraph.executeGremlinScript(this.scriptEngine, this.bindings, this.gremlinQuery, false); + } + + private void initScriptEngine() { + if (scriptEngineManager != null) { + return; + } + + scriptEngineManager = new ScriptEngineManager(); + scriptEngine = scriptEngineManager.getEngineByName("gremlin-groovy"); + bindings = scriptEngine.createBindings(); + + //Do not cache script compilations due to memory implications + scriptEngine.getContext().setAttribute("#jsr223.groovy.engine.keep.globals", "phantom", ScriptContext.ENGINE_SCOPE); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java b/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java new file mode 100644 index 0000000..7554cdb --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ImportService.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.web.resources; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.typedef.*; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.commons.io.FileUtils; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.List; + + +public class ImportService { + private static final Logger LOG = LoggerFactory.getLogger(ImportService.class); + + private final AtlasTypeDefStore typeDefStore; + private final AtlasEntityStore entityStore; + + + public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore) { + this.typeDefStore = typeDefStore; + this.entityStore = entityStore; + } + + public AtlasImportResult run(ZipSource source, AtlasImportRequest request, String userName, + String hostName, String requestingIP) throws AtlasBaseException { + AtlasImportResult result = new AtlasImportResult(request, userName, requestingIP, hostName, System.currentTimeMillis()); + + try { + LOG.info("==> import(user={}, from={})", userName, requestingIP); + + processTypes(source.getTypesDef(), result); + processEntities(source, result); + + result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS); + } catch (AtlasBaseException excp) { + LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp); + + throw excp; + } catch (Exception excp) { + LOG.error("import(user={}, from={}): failed", userName, requestingIP, excp); + + throw new AtlasBaseException(excp); + } finally { + try { + source.close(); + } catch (IOException e) { + // ignore + } + + LOG.info("<== import(user={}, from={}): status={}", userName, requestingIP, result.getOperationStatus()); + } + + return result; + } + + public AtlasImportResult run(AtlasImportRequest request, String userName, String hostName, String requestingIP) + throws AtlasBaseException { + String fileName = (String)request.getOptions().get("FILENAME"); + + if (StringUtils.isBlank(fileName)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "FILENAME parameter not found"); + } + + AtlasImportResult result = null; + + try { + LOG.info("==> import(user={}, from={}, fileName={})", userName, requestingIP, fileName); + + File file = new File(fileName); + ZipSource source = new ZipSource(new ByteArrayInputStream(FileUtils.readFileToByteArray(file))); + + result = run(source, request, userName, hostName, requestingIP); + } catch (AtlasBaseException excp) { + LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp); + + throw excp; + } catch (FileNotFoundException excp) { + LOG.error("import(user={}, from={}, fileName={}): file not found", userName, requestingIP, excp); + + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, fileName + ": file not found"); + } catch (Exception excp) { + LOG.error("import(user={}, from={}, fileName={}): failed", userName, requestingIP, excp); + + throw new AtlasBaseException(excp); + } finally { + LOG.info("<== import(user={}, from={}, fileName={}): status={}", userName, requestingIP, fileName, + (result == null ? AtlasImportResult.OperationStatus.FAIL : result.getOperationStatus())); + } + + return result; + } + + private void processTypes(AtlasTypesDef typeDefinitionMap, AtlasImportResult result) throws AtlasBaseException { + setGuidToEmpty(typeDefinitionMap.getEntityDefs()); + typeDefStore.updateTypesDef(typeDefinitionMap); + + result.incrementMeticsCounter("Enum(s)", typeDefinitionMap.getEnumDefs().size()); + result.incrementMeticsCounter("Struct(s)", typeDefinitionMap.getStructDefs().size()); + result.incrementMeticsCounter("Classification(s)", typeDefinitionMap.getClassificationDefs().size()); + result.incrementMeticsCounter("Entity definition(s)", typeDefinitionMap.getEntityDefs().size()); + } + + private void setGuidToEmpty(List<AtlasEntityDef> entityDefList) { + for (AtlasEntityDef edf: entityDefList) { + edf.setGuid(""); + } + } + + private void processEntities(ZipSource importSource, AtlasImportResult result) throws AtlasBaseException { + this.entityStore.createOrUpdate(importSource, false); + result.incrementMeticsCounter("Entities", importSource.getCreationOrder().size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/webapp/src/main/java/org/apache/atlas/web/resources/ZipExportFileNames.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipExportFileNames.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipExportFileNames.java new file mode 100644 index 0000000..c41ff56 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipExportFileNames.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.web.resources; + +public enum ZipExportFileNames { + ATLAS_EXPORT_INFO_NAME("atlas-export-info"), + ATLAS_EXPORT_ORDER_NAME("atlas-export-order"), + ATLAS_TYPESDEF_NAME("atlas-typesdef"); + + public final String name; + ZipExportFileNames(String name) { + this.name = name; + } + + @Override + public String toString() { + return this.name; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java index 4c1ca6c..2e4cb01 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSink.java @@ -54,21 +54,18 @@ public class ZipSink { } public void setResult(AtlasExportResult result) throws AtlasBaseException { - final String fileName = "atlas-export-info"; String jsonData = convertToJSON(result); - saveToZip(fileName, jsonData); + saveToZip(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME, jsonData); } public void setTypesDef(AtlasTypesDef typesDef) throws AtlasBaseException { - final String fileName = "atlas-typesdef"; String jsonData = convertToJSON(typesDef); - saveToZip(fileName, jsonData); + saveToZip(ZipExportFileNames.ATLAS_TYPESDEF_NAME, jsonData); } - public void setExportOrder(Map<String, List<String>> result) throws AtlasBaseException { - final String fileName = "atlas-export-order"; + public void setExportOrder(List<String> result) throws AtlasBaseException { String jsonData = convertToJSON(result); - saveToZip(fileName, jsonData); + saveToZip(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME, jsonData); } public void writeTo(OutputStream stream) throws IOException { @@ -90,9 +87,13 @@ public class ZipSink { return AtlasType.toJson(entity); } + private void saveToZip(ZipExportFileNames fileName, String jsonData) throws AtlasBaseException { + saveToZip(fileName.toString(), jsonData); + } + private void saveToZip(String fileName, String jsonData) throws AtlasBaseException { try { - addToZipStream(fileName + ".json", jsonData); + addToZipStream(fileName.toString() + ".json", jsonData); } catch (IOException e) { throw new AtlasBaseException(String.format("Error writing file %s.", fileName), e); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java new file mode 100644 index 0000000..ea62862 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/web/resources/ZipSource.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.web.resources; + +import org.codehaus.jackson.type.TypeReference; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.repository.store.graph.v1.EntityImportStream; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.Iterator; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + + +public class ZipSource implements EntityImportStream { + private static final Logger LOG = LoggerFactory.getLogger(ZipSource.class); + + private final ByteArrayInputStream inputStream; + private List<String> creationOrder; + private Iterator<String> iterator; + + public ZipSource(ByteArrayInputStream inputStream) { + this.inputStream = inputStream; + + this.setCreationOrder(); + } + + public AtlasTypesDef getTypesDef() throws AtlasBaseException { + final String fileName = ZipExportFileNames.ATLAS_TYPESDEF_NAME.toString(); + + try { + String s = get(fileName); + return convertFromJson(AtlasTypesDef.class, s); + } catch (IOException e) { + LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e); + return null; + } + } + + public AtlasExportResult getExportResult() throws AtlasBaseException { + String fileName = ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toString(); + try { + String s = get(fileName); + return convertFromJson(AtlasExportResult.class, s); + } catch (IOException e) { + LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e); + return null; + } + } + + + private void setCreationOrder() { + String fileName = ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toString(); + + try { + String s = get(fileName); + this.creationOrder = convertFromJson(new TypeReference<List<String>>(){}, s); + this.iterator = this.creationOrder.iterator(); + } catch (IOException e) { + LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e); + } catch (AtlasBaseException e) { + LOG.error(String.format("Error retrieving '%s' from zip.", fileName), e); + } + } + + public List<String> getCreationOrder() throws AtlasBaseException { + return this.creationOrder; + } + + public AtlasEntity getEntity(String guid) throws AtlasBaseException { + try { + String s = get(guid); + return convertFromJson(AtlasEntity.class, s); + } catch (IOException e) { + LOG.error(String.format("Error retrieving '%s' from zip.", guid), e); + return null; + } + } + + private String get(String entryName) throws IOException { + String ret = ""; + + inputStream.reset(); + + ZipInputStream zipInputStream = new ZipInputStream(inputStream); + ZipEntry zipEntry = zipInputStream.getNextEntry(); + + entryName = entryName + ".json"; + + while (zipEntry != null) { + if (zipEntry.getName().equals(entryName)) { + break; + } + + zipEntry = zipInputStream.getNextEntry(); + } + + if (zipEntry != null) { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + byte[] buf = new byte[1024]; + + int n = 0; + while ((n = zipInputStream.read(buf, 0, 1024)) > -1) { + os.write(buf, 0, n); + } + + ret = os.toString(); + } else { + LOG.warn("{}: no such entry in zip file", entryName); + } + + zipInputStream.close(); + + return ret; + } + + private <T> T convertFromJson(TypeReference clazz, String jsonData) throws AtlasBaseException { + try { + ObjectMapper mapper = new ObjectMapper(); + + return mapper.readValue(jsonData, clazz); + + } catch (Exception e) { + throw new AtlasBaseException("Error converting file to JSON.", e); + } + } + + private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException { + try { + ObjectMapper mapper = new ObjectMapper(); + + return mapper.readValue(jsonData, clazz); + + } catch (Exception e) { + throw new AtlasBaseException("Error converting file to JSON.", e); + } + } + + public void close() throws IOException { + inputStream.close(); + } + + @Override + public boolean hasNext() { + return this.iterator.hasNext(); + } + + @Override + public AtlasEntity next() { + try { + return getEntity(this.iterator.next()); + } catch (AtlasBaseException e) { + e.printStackTrace(); + return null; + } + } + + @Override + public void reset() { + try { + getCreationOrder(); + this.iterator = this.creationOrder.iterator(); + } catch (AtlasBaseException e) { + LOG.error("reset", e); + } + } + + @Override + public AtlasEntity getByGuid(String guid) { + try { + return getEntity(guid); + } catch (AtlasBaseException e) { + e.printStackTrace(); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java b/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java index 8ff5d04..c5c0731 100755 --- a/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java +++ b/webapp/src/main/java/org/apache/atlas/web/util/Servlets.java @@ -21,6 +21,7 @@ package org.apache.atlas.web.util; import org.apache.atlas.AtlasClient; import org.apache.atlas.LocalServletRequest; import org.apache.atlas.utils.ParamChecker; +import org.apache.commons.collections.MapUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.StringUtils; @@ -41,7 +42,9 @@ import java.io.StringWriter; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.charset.Charset; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Utility functions for dealing with servlets. @@ -193,4 +196,23 @@ public final class Servlets { public static String getUserName(HttpServletRequest httpServletRequest) throws IOException { return httpServletRequest.getRemoteUser(); } + + public static Map<String, Object> getParameterMap(HttpServletRequest request) { + Map<String, Object> attributes = new HashMap<>(); + + if (MapUtils.isNotEmpty(request.getParameterMap())) { + for (Map.Entry<String, String[]> e : request.getParameterMap().entrySet()) { + String key = e.getKey(); + + if (key != null) { + String[] values = e.getValue(); + String value = values != null && values.length > 0 ? values[0] : null; + + attributes.put(key, value); + } + } + } + + return attributes; + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6170063c/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java index c3d4b72..c0bbf09 100644 --- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java @@ -48,7 +48,7 @@ public class AdminResourceTest { when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); - AdminResource adminResource = new AdminResource(serviceState, null, null, null); + AdminResource adminResource = new AdminResource(serviceState, null, null, null, null); Response response = adminResource.getStatus(); assertEquals(response.getStatus(), HttpServletResponse.SC_OK); JSONObject entity = (JSONObject) response.getEntity(); @@ -59,7 +59,7 @@ public class AdminResourceTest { public void testResourceGetsValueFromServiceState() throws JSONException { when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); - AdminResource adminResource = new AdminResource(serviceState, null, null, null); + AdminResource adminResource = new AdminResource(serviceState, null, null, null, null); Response response = adminResource.getStatus(); verify(serviceState).getState();