Repository: atlas
Updated Branches:
  refs/heads/master 75415862c -> 54c31d5c8


ATLAS-2545: updated Storm hook to use V2 notifications

Signed-off-by: Madhan Neethiraj <mad...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/54c31d5c
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/54c31d5c
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/54c31d5c

Branch: refs/heads/master
Commit: 54c31d5c8e601757e19e26d3c30f2414532e2f8f
Parents: 7541586
Author: rdsolani <rdsol...@gmail.com>
Authored: Mon Apr 9 16:57:36 2018 +0530
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Tue Apr 10 13:48:27 2018 -0700

----------------------------------------------------------------------
 addons/storm-bridge/pom.xml                     |  44 +++
 .../apache/atlas/storm/hook/StormAtlasHook.java | 394 +++++++++----------
 pom.xml                                         |   6 +
 3 files changed, 247 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/54c31d5c/addons/storm-bridge/pom.xml
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml
index 3446dcb..484902c 100644
--- a/addons/storm-bridge/pom.xml
+++ b/addons/storm-bridge/pom.xml
@@ -254,6 +254,11 @@
                                             
<artifactId>commons-configuration</artifactId>
                                             
<version>${commons-conf.version}</version>
                                         </artifactItem>
+                                        <dependency>
+                                            
<groupId>org.apache.commons</groupId>
+                                            
<artifactId>commons-configuration2</artifactId>
+                                            
<version>${commons-conf2.version}</version>
+                                        </dependency>
                                         <artifactItem>
                                             <groupId>commons-logging</groupId>
                                             
<artifactId>commons-logging</artifactId>
@@ -295,6 +300,40 @@
                                             
<version>${hadoop.version}</version>
                                         </artifactItem>
                                         <artifactItem>
+                                            
<groupId>org.apache.hadoop</groupId>
+                                            
<artifactId>hadoop-auth</artifactId>
+                                            
<version>${hadoop.version}</version>
+                                        </artifactItem>
+
+                                        <artifactItem>
+                                            
<groupId>com.fasterxml.jackson.core</groupId>
+                                            
<artifactId>jackson-databind</artifactId>
+                                            
<version>${jackson.version}</version>
+                                        </artifactItem>
+
+                                        <artifactItem>
+                                            
<groupId>com.fasterxml.jackson.core</groupId>
+                                            
<artifactId>jackson-core</artifactId>
+                                            
<version>${jackson.version}</version>
+                                        </artifactItem>
+
+                                        <artifactItem>
+                                            
<groupId>com.fasterxml.jackson.core</groupId>
+                                            
<artifactId>jackson-annotations</artifactId>
+                                            
<version>${jackson.version}</version>
+                                        </artifactItem>
+                                        <dependency>
+                                            
<groupId>org.codehaus.woodstox</groupId>
+                                            <artifactId>stax2-api</artifactId>
+                                            
<version>${codehaus.woodstox.stax2-api.version}</version>
+                                        </dependency>
+                                        <dependency>
+                                            
<groupId>org.apache.hadoop</groupId>
+                                            
<artifactId>hadoop-hdfs-client</artifactId>
+                                            
<version>${hadoop.hdfs-client.version}</version>
+                                        </dependency>
+
+                                        <artifactItem>
                                             <groupId>log4j</groupId>
                                             <artifactId>log4j</artifactId>
                                             <version>${log4j.version}</version>
@@ -309,6 +348,11 @@
                                             <artifactId>jsr311-api</artifactId>
                                             <version>${jsr.version}</version>
                                         </artifactItem>
+                                        <artifactItem>
+                                            
<groupId>com.fasterxml.woodstox</groupId>
+                                            
<artifactId>woodstox-core</artifactId>
+                                            
<version>${woodstox-core.version}</version>
+                                        </artifactItem>
                                     </artifactItems>
                                 </configuration>
                             </execution>

http://git-wip-us.apache.org/repos/asf/atlas/blob/54c31d5c/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
----------------------------------------------------------------------
diff --git 
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
 
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 110ec52..7f725a4 100644
--- 
a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ 
b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -18,8 +18,14 @@
 
 package org.apache.atlas.storm.hook;
 
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
+import 
org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
+import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.atlas.utils.HdfsNameServiceResolver;
-import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.storm.ISubmitterHook;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.SpoutSpec;
@@ -40,6 +46,7 @@ import org.slf4j.Logger;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,17 +61,13 @@ import java.util.Date;
  * for the various lifecycle stages.
  */
 public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
-
     public static final Logger LOG = 
org.slf4j.LoggerFactory.getLogger(StormAtlasHook.class);
 
-    private static final String CONF_PREFIX = "atlas.hook.storm.";
-    private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
-    // will be used for owner if Storm topology does not contain the owner 
instance
-    // possible if Storm is running in unsecure mode.
-    public static final String ANONYMOUS_OWNER = "anonymous";
-
-    public static final String HBASE_NAMESPACE_DEFAULT = "default";
-    public static final String ATTRIBUTE_DB = "db";
+    private static final String CONF_PREFIX             = "atlas.hook.storm.";
+    private static final String HOOK_NUM_RETRIES        = CONF_PREFIX + 
"numRetries";
+    public  static final String ANONYMOUS_OWNER         = "anonymous"; // if 
Storm topology does not contain the owner instance; possible if Storm is 
running in unsecure mode.
+    public  static final String HBASE_NAMESPACE_DEFAULT = "default";
+    public  static final String ATTRIBUTE_DB            = "db";
 
     private final HdfsNameServiceResolver hdfsNameServiceResolver = 
HdfsNameServiceResolver.getInstance();
 
@@ -81,112 +84,103 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
      * @param stormTopology a storm topology
      */
     @Override
-    public void notify(TopologyInfo topologyInfo, Map stormConf,
-                       StormTopology stormTopology) {
-
+    public void notify(TopologyInfo topologyInfo, Map stormConf, StormTopology 
stormTopology) {
         LOG.info("Collecting metadata for a new storm topology: {}", 
topologyInfo.get_name());
+
         try {
-            ArrayList<Referenceable> entities = new ArrayList<>();
-            Referenceable topologyReferenceable = 
createTopologyInstance(topologyInfo, stormConf);
-            List<Referenceable> dependentEntities = 
addTopologyDataSets(stormTopology, topologyReferenceable,
-                    topologyInfo.get_owner(), stormConf);
-            if (dependentEntities.size()>0) {
-                entities.addAll(dependentEntities);
-            }
+            String                   user     = 
getUser(topologyInfo.get_owner(), null);
+            AtlasEntity              topology = 
createTopologyInstance(topologyInfo, stormConf);
+            AtlasEntitiesWithExtInfo entity   = new 
AtlasEntitiesWithExtInfo(topology);
+
+            addTopologyDataSets(stormTopology, topologyInfo.get_owner(), 
stormConf, topology, entity);
+
             // create the graph for the topology
-            ArrayList<Referenceable> graphNodes = createTopologyGraph(
-                    stormTopology, stormTopology.get_spouts(), 
stormTopology.get_bolts());
-            // add the connection from topology to the graph
-            topologyReferenceable.set("nodes", graphNodes);
-            entities.add(topologyReferenceable);
-
-            LOG.debug("notifying entities, size = {}", entities.size());
-            String user = getUser(topologyInfo.get_owner(), null);
-            notifyEntities(user, entities);
+            List<AtlasEntity> graphNodes = createTopologyGraph(stormTopology, 
stormTopology.get_spouts(), stormTopology.get_bolts());
+
+            if (CollectionUtils.isNotEmpty(graphNodes)) {
+                // add the connection from topology to the graph
+                topology.setAttribute("nodes", 
AtlasTypeUtil.getAtlasObjectIds(graphNodes));
+
+                for (AtlasEntity graphNode : graphNodes) {
+                    entity.addReferredEntity(graphNode);
+                }
+            }
+
+            List<HookNotification> hookNotifications = 
Collections.singletonList(new EntityCreateRequestV2(user, entity));
+
+            notifyEntities(hookNotifications);
         } catch (Exception e) {
             throw new RuntimeException("Atlas hook is unable to process the 
topology.", e);
         }
     }
 
-    private Referenceable createTopologyInstance(TopologyInfo topologyInfo, 
Map stormConf) {
-        Referenceable topologyReferenceable = new Referenceable(
-                StormDataTypes.STORM_TOPOLOGY.getName());
-        topologyReferenceable.set("id", topologyInfo.get_id());
-        topologyReferenceable.set(AtlasClient.NAME, topologyInfo.get_name());
-        topologyReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
topologyInfo.get_name());
-        String owner = topologyInfo.get_owner();
+    private AtlasEntity createTopologyInstance(TopologyInfo topologyInfo, Map 
stormConf) {
+        AtlasEntity topology = new 
AtlasEntity(StormDataTypes.STORM_TOPOLOGY.getName());
+        String      owner    = topologyInfo.get_owner();
+
         if (StringUtils.isEmpty(owner)) {
             owner = ANONYMOUS_OWNER;
         }
-        topologyReferenceable.set(AtlasClient.OWNER, owner);
-        topologyReferenceable.set("startTime", new 
Date(System.currentTimeMillis()));
-        topologyReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, 
getClusterName(stormConf));
 
-        return topologyReferenceable;
+        topology.setAttribute("id", topologyInfo.get_id());
+        topology.setAttribute(AtlasClient.NAME, topologyInfo.get_name());
+        topology.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
topologyInfo.get_name());
+        topology.setAttribute(AtlasClient.OWNER, owner);
+        topology.setAttribute("startTime", new 
Date(System.currentTimeMillis()));
+        topology.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, 
getClusterName(stormConf));
+
+        return topology;
     }
 
-    private List<Referenceable> addTopologyDataSets(StormTopology 
stormTopology,
-                                                    Referenceable 
topologyReferenceable,
-                                                    String topologyOwner,
-                                                    Map stormConf) {
-        List<Referenceable> dependentEntities = new ArrayList<>();
+    private void addTopologyDataSets(StormTopology stormTopology, String 
topologyOwner, Map stormConf, AtlasEntity topology, AtlasEntityExtInfo 
entityExtInfo) {
         // add each spout as an input data set
-        addTopologyInputs(topologyReferenceable,
-                stormTopology.get_spouts(), stormConf, topologyOwner, 
dependentEntities);
+        addTopologyInputs(stormTopology.get_spouts(), stormConf, 
topologyOwner, topology, entityExtInfo);
+
         // add the appropriate bolts as output data sets
-        addTopologyOutputs(topologyReferenceable, stormTopology, 
topologyOwner, stormConf, dependentEntities);
-        return dependentEntities;
+        addTopologyOutputs(stormTopology, topologyOwner, stormConf, topology, 
entityExtInfo);
     }
 
-    private void addTopologyInputs(Referenceable topologyReferenceable,
-                                   Map<String, SpoutSpec> spouts,
-                                   Map stormConf,
-                                   String topologyOwner, List<Referenceable> 
dependentEntities) {
-        final ArrayList<Referenceable> inputDataSets = new ArrayList<>();
+    private void addTopologyInputs(Map<String, SpoutSpec> spouts, Map 
stormConf, String topologyOwner, AtlasEntity topology, AtlasEntityExtInfo 
entityExtInfo) {
+        List<AtlasEntity> inputs = new ArrayList<>();
+
         for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
-            Serializable instance = Utils.javaDeserialize(
-                    entry.getValue().get_spout_object().get_serialized_java(), 
Serializable.class);
+            Serializable instance = 
Utils.javaDeserialize(entry.getValue().get_spout_object().get_serialized_java(),
 Serializable.class);
+            String       dsType   = instance.getClass().getSimpleName();
+            AtlasEntity  dsEntity = addDataSet(dsType, topologyOwner, 
instance, stormConf, entityExtInfo);
 
-            String simpleName = instance.getClass().getSimpleName();
-            final Referenceable datasetRef = createDataSet(simpleName, 
topologyOwner, instance, stormConf, dependentEntities);
-            if (datasetRef != null) {
-                inputDataSets.add(datasetRef);
+            if (dsEntity != null) {
+                inputs.add(dsEntity);
             }
         }
 
-        topologyReferenceable.set("inputs", inputDataSets);
+        topology.setAttribute("inputs", 
AtlasTypeUtil.getAtlasObjectIds(inputs));
     }
 
-    private void addTopologyOutputs(Referenceable topologyReferenceable,
-                                    StormTopology stormTopology, String 
topologyOwner,
-                                    Map stormConf, List<Referenceable> 
dependentEntities) {
-        final ArrayList<Referenceable> outputDataSets = new ArrayList<>();
-
-        Map<String, Bolt> bolts = stormTopology.get_bolts();
-        Set<String> terminalBoltNames = 
StormTopologyUtil.getTerminalUserBoltNames(stormTopology);
-        for (String terminalBoltName : terminalBoltNames) {
-            Serializable instance = 
Utils.javaDeserialize(bolts.get(terminalBoltName)
-                    .get_bolt_object().get_serialized_java(), 
Serializable.class);
-
-            String dataSetType = instance.getClass().getSimpleName();
-            final Referenceable datasetRef = createDataSet(dataSetType, 
topologyOwner, instance, stormConf, dependentEntities);
-            if (datasetRef != null) {
-                outputDataSets.add(datasetRef);
+    private void addTopologyOutputs(StormTopology stormTopology, String 
topologyOwner, Map stormConf, AtlasEntity topology, AtlasEntityExtInfo 
entityExtInfo) {
+        List<AtlasEntity> outputs   = new ArrayList<>();
+        Map<String, Bolt> bolts     = stormTopology.get_bolts();
+        Set<String>       boltNames = 
StormTopologyUtil.getTerminalUserBoltNames(stormTopology);
+
+        for (String boltName : boltNames) {
+            Serializable instance = 
Utils.javaDeserialize(bolts.get(boltName).get_bolt_object().get_serialized_java(),
 Serializable.class);
+            String       dsType   = instance.getClass().getSimpleName();
+            AtlasEntity  dsEntity = addDataSet(dsType, topologyOwner, 
instance, stormConf, entityExtInfo);
+
+            if (dsEntity != null) {
+                outputs.add(dsEntity);
             }
         }
 
-        topologyReferenceable.set("outputs", outputDataSets);
+        topology.setAttribute("outputs", 
AtlasTypeUtil.getAtlasObjectIds(outputs));
     }
 
-    private Referenceable createDataSet(String name, String topologyOwner,
-                                              Serializable instance,
-                                              Map stormConf, 
List<Referenceable> dependentEntities) {
-        Map<String, String> config = 
StormTopologyUtil.getFieldValues(instance, true, null);
+    private AtlasEntity addDataSet(String dataSetType, String topologyOwner, 
Serializable instance, Map stormConf, AtlasEntityExtInfo entityExtInfo) {
+        Map<String, String> config      = 
StormTopologyUtil.getFieldValues(instance, true, null);
+        String              clusterName = null;
+        AtlasEntity         ret         = null;
 
-        String clusterName = null;
-        Referenceable dataSetReferenceable;
         // todo: need to redo this with a config driven approach
-        switch (name) {
+        switch (dataSetType) {
             case "KafkaSpout": {
                 String topicName = 
config.get("KafkaSpout.kafkaSpoutConfig.translator.topic");
                 String uri       = 
config.get("KafkaSpout.kafkaSpoutConfig.kafkaProps.bootstrap.servers");
@@ -199,21 +193,23 @@ public class StormAtlasHook extends AtlasHook implements 
ISubmitterHook {
                     uri = 
config.get("KafkaSpout._spoutConfig.hosts.brokerZkStr");
                 }
 
-                dataSetReferenceable = new 
Referenceable(StormDataTypes.KAFKA_TOPIC.getName());
-                dataSetReferenceable.set("topic", topicName);
-                dataSetReferenceable.set("uri", uri);
-
                 if (StringUtils.isEmpty(topologyOwner)) {
                     topologyOwner = ANONYMOUS_OWNER;
                 }
-                dataSetReferenceable.set(AtlasClient.OWNER, topologyOwner);
-                
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getKafkaTopicQualifiedName(getClusterName(stormConf), topicName));
-                dataSetReferenceable.set(AtlasClient.NAME, topicName);
+
+                clusterName = getClusterName(stormConf);
+
+                ret = new AtlasEntity(StormDataTypes.KAFKA_TOPIC.getName());
+
+                ret.setAttribute("topic", topicName);
+                ret.setAttribute("uri", uri);
+                ret.setAttribute(AtlasClient.OWNER, topologyOwner);
+                ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getKafkaTopicQualifiedName(clusterName, topicName));
+                ret.setAttribute(AtlasClient.NAME, topicName);
             }
             break;
 
             case "HBaseBolt": {
-                dataSetReferenceable = new 
Referenceable(StormDataTypes.HBASE_TABLE.getName());
                 final String hbaseTableName = 
config.get("HBaseBolt.tableName");
                 String       uri            = config.get("hbase.rootdir");
 
@@ -221,191 +217,195 @@ public class StormAtlasHook extends AtlasHook 
implements ISubmitterHook {
                     uri = hbaseTableName;
                 }
 
-                dataSetReferenceable.set("uri", hbaseTableName);
-                dataSetReferenceable.set(AtlasClient.NAME, uri);
-                dataSetReferenceable.set(AtlasClient.OWNER, 
stormConf.get("storm.kerberos.principal"));
                 clusterName = 
extractComponentClusterName(HBaseConfiguration.create(), stormConf);
+
+                ret = new AtlasEntity(StormDataTypes.HBASE_TABLE.getName());
+
+                ret.setAttribute("uri", hbaseTableName);
+                ret.setAttribute(AtlasClient.NAME, uri);
+                ret.setAttribute(AtlasClient.OWNER, 
stormConf.get("storm.kerberos.principal"));
                 //TODO - Hbase Namespace is hardcoded to 'default'. need to 
check how to get this or is it already part of tableName
-                
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT,
-                        hbaseTableName));
+                ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, 
hbaseTableName));
             }
             break;
 
-            case "HdfsBolt":
-                dataSetReferenceable = new 
Referenceable(HiveMetaStoreBridge.HDFS_PATH);
-                String hdfsUri = config.get("HdfsBolt.rotationActions") == null
-                        ? config.get("HdfsBolt.fileNameFormat.path")
-                        : config.get("HdfsBolt.rotationActions");
-
+            case "HdfsBolt": {
+                final String hdfsUri       = 
config.get("HdfsBolt.rotationActions") == null ? 
config.get("HdfsBolt.fileNameFormat.path") : 
config.get("HdfsBolt.rotationActions");
                 final String hdfsPathStr   = config.get("HdfsBolt.fsUrl") + 
hdfsUri;
+                final Path   hdfsPath      = new Path(hdfsPathStr);
                 final String nameServiceID = 
hdfsNameServiceResolver.getNameServiceIDForPath(hdfsPathStr);
 
                 clusterName = getClusterName(stormConf);
 
-                
dataSetReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, 
getClusterName(stormConf));
+                ret = new AtlasEntity(HiveMetaStoreBridge.HDFS_PATH);
+
+                ret.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, 
getClusterName(stormConf));
+                ret.setAttribute(AtlasClient.OWNER, 
stormConf.get("hdfs.kerberos.principal"));
+                ret.setAttribute(AtlasClient.NAME, 
Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase());
+
                 if (StringUtils.isNotEmpty(nameServiceID)) {
                     String updatedPath = 
hdfsNameServiceResolver.getPathWithNameServiceID(hdfsPathStr);
-                    dataSetReferenceable.set("path", updatedPath);
-                    dataSetReferenceable.set("nameServiceId", nameServiceID);
-                    
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getHdfsPathQualifiedName(clusterName, updatedPath));
+
+                    ret.setAttribute("path", updatedPath);
+                    ret.setAttribute("nameServiceId", nameServiceID);
+                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getHdfsPathQualifiedName(clusterName, updatedPath));
                 } else {
-                    dataSetReferenceable.set("path", hdfsPathStr);
-                    
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getHdfsPathQualifiedName(clusterName, hdfsPathStr));
+                    ret.setAttribute("path", hdfsPathStr);
+                    ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
getHdfsPathQualifiedName(clusterName, hdfsPathStr));
                 }
-                dataSetReferenceable.set(AtlasClient.OWNER, 
stormConf.get("hdfs.kerberos.principal"));
-                final Path hdfsPath = new Path(hdfsPathStr);
-                dataSetReferenceable.set(AtlasClient.NAME, 
Path.getPathWithoutSchemeAndAuthority(hdfsPath).toString().toLowerCase());
-                break;
+            }
+            break;
 
-            case "HiveBolt":
-                // todo: verify if hive table has everything needed to 
retrieve existing table
-                Referenceable dbReferenceable = new Referenceable("hive_db");
-                String databaseName = 
config.get("HiveBolt.options.databaseName");
-                dbReferenceable.set(AtlasClient.NAME, databaseName);
-                dbReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                        
HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), 
databaseName));
-                dbReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, 
getClusterName(stormConf));
-                dependentEntities.add(dbReferenceable);
+            case "HiveBolt": {
                 clusterName = extractComponentClusterName(new HiveConf(), 
stormConf);
-                final String hiveTableName = 
config.get("HiveBolt.options.tableName");
-                dataSetReferenceable = new Referenceable("hive_table");
-                final String tableQualifiedName = 
HiveMetaStoreBridge.getTableQualifiedName(clusterName,
-                        databaseName, hiveTableName);
-                dataSetReferenceable.set(AtlasClient.NAME, hiveTableName);
-                dataSetReferenceable.set(ATTRIBUTE_DB, dbReferenceable);
-                
dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
tableQualifiedName);
-                break;
+
+                final String dbName           = 
config.get("HiveBolt.options.databaseName");
+                final String tblName          = 
config.get("HiveBolt.options.tableName");
+                final String tblQualifiedName = 
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tblName);
+
+                AtlasEntity dbEntity = new AtlasEntity("hive_db");
+
+                dbEntity.setAttribute(AtlasClient.NAME, dbName);
+                
dbEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), dbName));
+                dbEntity.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, 
getClusterName(stormConf));
+
+                entityExtInfo.addReferredEntity(dbEntity);
+
+                // todo: verify if hive table has everything needed to 
retrieve existing table
+                ret = new AtlasEntity("hive_table");
+
+                ret.setAttribute(AtlasClient.NAME, tblName);
+                ret.setAttribute(ATTRIBUTE_DB, 
AtlasTypeUtil.getAtlasObjectId(dbEntity));
+                ret.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
tblQualifiedName);
+            }
+            break;
 
             default:
                 // custom node - create a base dataset class with name 
attribute
                 //TODO - What should we do for custom data sets. Not sure what 
name we can set here?
                 return null;
         }
-        dependentEntities.add(dataSetReferenceable);
-
-
-        return dataSetReferenceable;
-    }
 
-    private String extractComponentClusterName(Configuration configuration, 
Map stormConf) {
-        String clusterName = 
configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null);
-        if (clusterName == null) {
-            clusterName = getClusterName(stormConf);
+        if (ret != null) {
+            entityExtInfo.addReferredEntity(ret);
         }
-        return clusterName;
-    }
 
+        return ret;
+    }
 
-    private ArrayList<Referenceable> createTopologyGraph(StormTopology 
stormTopology,
-                                                         Map<String, 
SpoutSpec> spouts,
-                                                         Map<String, Bolt> 
bolts) {
+    private List<AtlasEntity> createTopologyGraph(StormTopology stormTopology, 
Map<String, SpoutSpec> spouts, Map<String, Bolt> bolts) {
         // Add graph of nodes in the topology
-        final Map<String, Referenceable> nodeEntities = new HashMap<>();
+        Map<String, AtlasEntity> nodeEntities = new HashMap<>();
+
         addSpouts(spouts, nodeEntities);
         addBolts(bolts, nodeEntities);
 
         addGraphConnections(stormTopology, nodeEntities);
 
-        ArrayList<Referenceable> nodes = new ArrayList<>();
-        nodes.addAll(nodeEntities.values());
-        return nodes;
+        return new ArrayList<>(nodeEntities.values());
     }
 
-    private void addSpouts(Map<String, SpoutSpec> spouts,
-                           Map<String, Referenceable> nodeEntities) {
+    private void addSpouts(Map<String, SpoutSpec> spouts, Map<String, 
AtlasEntity> nodeEntities) {
         for (Map.Entry<String, SpoutSpec> entry : spouts.entrySet()) {
-            final String spoutName = entry.getKey();
-            Referenceable spoutReferenceable = createSpoutInstance(
-                    spoutName, entry.getValue());
-            nodeEntities.put(spoutName, spoutReferenceable);
-        }
-    }
+            String      spoutName = entry.getKey();
+            AtlasEntity spout     = createSpoutInstance(spoutName, 
entry.getValue());
 
-    private Referenceable createSpoutInstance(String spoutName,
-                                              SpoutSpec stormSpout) {
-        Referenceable spoutReferenceable = new 
Referenceable(StormDataTypes.STORM_SPOUT.getName());
-        spoutReferenceable.set(AtlasClient.NAME, spoutName);
-
-        Serializable instance = Utils.javaDeserialize(
-                stormSpout.get_spout_object().get_serialized_java(), 
Serializable.class);
-        spoutReferenceable.set("driverClass", instance.getClass().getName());
-
-        Map<String, String> flatConfigMap = 
StormTopologyUtil.getFieldValues(instance, true, null);
-        spoutReferenceable.set("conf", flatConfigMap);
-
-        return spoutReferenceable;
+            nodeEntities.put(spoutName, spout);
+        }
     }
 
-    private void addBolts(Map<String, Bolt> bolts,
-                          Map<String, Referenceable> nodeEntities) {
+    private void addBolts(Map<String, Bolt> bolts, Map<String, AtlasEntity> 
nodeEntities) {
         for (Map.Entry<String, Bolt> entry : bolts.entrySet()) {
-            Referenceable boltInstance = createBoltInstance(entry.getKey(), 
entry.getValue());
-            nodeEntities.put(entry.getKey(), boltInstance);
+            String      boltName     = entry.getKey();
+            AtlasEntity boltInstance = createBoltInstance(boltName, 
entry.getValue());
+
+            nodeEntities.put(boltName, boltInstance);
         }
     }
 
-    private Referenceable createBoltInstance(String boltName,
-                                             Bolt stormBolt) {
-        Referenceable boltReferenceable = new 
Referenceable(StormDataTypes.STORM_BOLT.getName());
+    private AtlasEntity createSpoutInstance(String spoutName, SpoutSpec 
stormSpout) {
+        AtlasEntity         spout         = new 
AtlasEntity(StormDataTypes.STORM_SPOUT.getName());
+        Serializable        instance      = 
Utils.javaDeserialize(stormSpout.get_spout_object().get_serialized_java(), 
Serializable.class);
+        Map<String, String> flatConfigMap = 
StormTopologyUtil.getFieldValues(instance, true, null);
 
-        boltReferenceable.set(AtlasClient.NAME, boltName);
+        spout.setAttribute(AtlasClient.NAME, spoutName);
+        spout.setAttribute("driverClass", instance.getClass().getName());
+        spout.setAttribute("conf", flatConfigMap);
 
-        Serializable instance = Utils.javaDeserialize(
-                stormBolt.get_bolt_object().get_serialized_java(), 
Serializable.class);
-        boltReferenceable.set("driverClass", instance.getClass().getName());
+        return spout;
+    }
 
+    private AtlasEntity createBoltInstance(String boltName, Bolt stormBolt) {
+        AtlasEntity         bolt          = new 
AtlasEntity(StormDataTypes.STORM_BOLT.getName());
+        Serializable        instance      = 
Utils.javaDeserialize(stormBolt.get_bolt_object().get_serialized_java(), 
Serializable.class);
         Map<String, String> flatConfigMap = 
StormTopologyUtil.getFieldValues(instance, true, null);
-        boltReferenceable.set("conf", flatConfigMap);
 
-        return boltReferenceable;
+        bolt.setAttribute(AtlasClient.NAME, boltName);
+        bolt.setAttribute("driverClass", instance.getClass().getName());
+        bolt.setAttribute("conf", flatConfigMap);
+
+        return bolt;
     }
 
-    private void addGraphConnections(StormTopology stormTopology,
-                                     Map<String, Referenceable> nodeEntities) {
+    private void addGraphConnections(StormTopology stormTopology, Map<String, 
AtlasEntity> nodeEntities) {
         // adds connections between spouts and bolts
-        Map<String, Set<String>> adjacencyMap =
-                StormTopologyUtil.getAdjacencyMap(stormTopology, true);
+        Map<String, Set<String>> adjacencyMap = 
StormTopologyUtil.getAdjacencyMap(stormTopology, true);
 
         for (Map.Entry<String, Set<String>> entry : adjacencyMap.entrySet()) {
-            String nodeName = entry.getKey();
+            String      nodeName      = entry.getKey();
             Set<String> adjacencyList = adjacencyMap.get(nodeName);
-            if (adjacencyList == null || adjacencyList.isEmpty()) {
+
+            if (CollectionUtils.isEmpty(adjacencyList)) {
                 continue;
             }
 
             // add outgoing links
-            Referenceable node = nodeEntities.get(nodeName);
-            ArrayList<String> outputs = new ArrayList<>(adjacencyList.size());
+            AtlasEntity  node    = nodeEntities.get(nodeName);
+            List<String> outputs = new ArrayList<>(adjacencyList.size());
+
             outputs.addAll(adjacencyList);
-            node.set("outputs", outputs);
+            node.setAttribute("outputs", outputs);
 
             // add incoming links
             for (String adjacentNodeName : adjacencyList) {
-                Referenceable adjacentNode = 
nodeEntities.get(adjacentNodeName);
+                AtlasEntity adjacentNode = nodeEntities.get(adjacentNodeName);
                 @SuppressWarnings("unchecked")
-                ArrayList<String> inputs = (ArrayList<String>) 
adjacentNode.get("inputs");
+                List<String> inputs = (List<String>) 
adjacentNode.getAttribute("inputs");
+
                 if (inputs == null) {
                     inputs = new ArrayList<>();
                 }
+
                 inputs.add(nodeName);
-                adjacentNode.set("inputs", inputs);
+                adjacentNode.setAttribute("inputs", inputs);
             }
         }
     }
 
     public static String getKafkaTopicQualifiedName(String clusterName, String 
topicName) {
-        return String.format("%s@%s", topicName, clusterName);
+        return String.format("%s@%s", topicName.toLowerCase(), clusterName);
     }
 
     public static String getHbaseTableQualifiedName(String clusterName, String 
nameSpace, String tableName) {
-        return String.format("%s.%s@%s", nameSpace, tableName, clusterName);
+        return String.format("%s.%s@%s", nameSpace.toLowerCase(), 
tableName.toLowerCase(), clusterName);
     }
 
     public static String getHdfsPathQualifiedName(String clusterName, String 
hdfsPath) {
-        return String.format("%s@%s", hdfsPath, clusterName);
+        return String.format("%s@%s", hdfsPath.toLowerCase(), clusterName);
     }
 
     private String getClusterName(Map stormConf) {
         return atlasProperties.getString(AtlasConstants.CLUSTER_NAME_KEY, 
AtlasConstants.DEFAULT_CLUSTER_NAME);
     }
+
+    private String extractComponentClusterName(Configuration configuration, 
Map stormConf) {
+        String clusterName = 
configuration.get(AtlasConstants.CLUSTER_NAME_KEY, null);
+
+        if (clusterName == null) {
+            clusterName = getClusterName(stormConf);
+        }
+
+        return clusterName;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/54c31d5c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1934abf..a6d1268 100644
--- a/pom.xml
+++ b/pom.xml
@@ -570,6 +570,8 @@
         <aopalliance.version>1.0</aopalliance.version>
         <jackson.version>2.9.2</jackson.version>
         <commons-conf.version>1.10</commons-conf.version>
+        <commons-conf2.version>2.2</commons-conf2.version>
+
         <commons-collections.version>3.2.2</commons-collections.version>
         <commons-logging.version>1.1.3</commons-logging.version>
         <commons-lang.version>2.6</commons-lang.version>
@@ -582,6 +584,10 @@
                <maven-site-plugin.version>3.7</maven-site-plugin.version>
                <doxia.version>1.8</doxia.version>
         <dropwizard-metrics>3.2.2</dropwizard-metrics>
+        <!-- hadoop.hdfs-client.version should same as hadoop version -->
+        <hadoop.hdfs-client.version>2.8.1</hadoop.hdfs-client.version>
+        
<codehaus.woodstox.stax2-api.version>3.1.4</codehaus.woodstox.stax2-api.version>
+        <woodstox-core.version>5.0.3</woodstox-core.version>
 
         <PermGen>64m</PermGen>
         <MaxPermGen>512m</MaxPermGen>

Reply via email to