NIFI-4818: Fix transit URL parsing at Hive2JDBC and KafkaTopic for 
ReportLineageToAtlas

- Hive2JDBC: Handle connection parameters and multiple host entries
correctly
- KafkaTopic: Handle multiple host entries correctly
- Avoid potential "IllegalStateException: Duplicate key" exception
when NiFiAtlasHook analyzes existing NiFiFlowPath input/output entries
- This closes #2435


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a0bd6113
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a0bd6113
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a0bd6113

Branch: refs/heads/HDF-3.1-maint
Commit: a0bd61138a095217d3092e3bb29ab6971d019db0
Parents: 0ddd0b1
Author: Koji Kawamura <ijokaruma...@apache.org>
Authored: Thu Jan 25 13:57:01 2018 +0900
Committer: Matt Gilman <matt.c.gil...@gmail.com>
Committed: Wed Feb 7 09:01:41 2018 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/atlas/NiFiAtlasHook.java    |   6 +-
 .../atlas/provenance/analyzer/Hive2JDBC.java    |  46 +++++--
 .../atlas/provenance/analyzer/KafkaTopic.java   |  10 +-
 .../provenance/analyzer/TestHive2JDBC.java      | 133 +++++++++++++++++++
 .../provenance/analyzer/TestKafkaTopic.java     |   3 +-
 5 files changed, 179 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a0bd6113/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
index a15c935..58945d5 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasHook.java
@@ -255,7 +255,11 @@ public class NiFiAtlasHook extends AtlasHook implements 
LineageContext {
             }
             return new Tuple<>(refQualifiedName, 
typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName)));
         }).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null)
-                .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
+                // If duplication happens, use new value.
+                .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue, 
(oldValue, newValue) -> {
+                    logger.warn("Duplicated qualified name was found, use the 
new one. oldValue={}, newValue={}", new Object[]{oldValue, newValue});
+                    return newValue;
+                }));
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/nifi/blob/a0bd6113/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
index e393a09..ccbbc66 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
@@ -21,10 +21,14 @@ import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.StringUtils;
 import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.net.URI;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static 
org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_INPUT_TABLES;
 import static 
org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_OUTPUT_TABLES;
@@ -49,17 +53,41 @@ import static 
org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.par
  */
 public class Hive2JDBC extends AbstractHiveAnalyzer {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(Hive2JDBC.class);
+
+    // 
jdbc:hive2://<host1>:<port1>,<host2>:<port2>/dbName;initFile=<file>;sess_var_list?hive_conf_list#hive_var_list
+    // Group 1 = <host1>:<port1>,<host2>:<port2>
+    // Group 2 = 
dbName;initFile=<file>;sess_var_list?hive_conf_list#hive_var_list
+    private static final String URI_PATTERN_STR = 
"jdbc:hive2://([^/]+)/?(.*)$";
+    private static final Pattern URI_PATTERN = 
Pattern.compile(URI_PATTERN_STR);
+
     @Override
     public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord 
event) {
 
         // Replace the colon so that the schema in the URI can be parsed 
correctly.
-        final String transitUri = 
event.getTransitUri().replaceFirst("^jdbc:hive2", "jdbc-hive2");
-        final URI uri = parseUri(transitUri);
-        final String clusterName = 
context.getClusterResolver().fromHostNames(uri.getHost());
-        // Remove the heading '/'
-        final String path = uri.getPath();
-        // If uri does not contain database name, then use 'default' as 
database name.
-        final String connectedDatabaseName = path == null || path.isEmpty() ? 
"default" : path.substring(1);
+        final String transitUri = event.getTransitUri();
+        if (transitUri == null) {
+            return null;
+        }
+
+        final Matcher uriMatcher = URI_PATTERN.matcher(transitUri);
+        if (!uriMatcher.matches()) {
+            logger.warn("Unexpected transit URI: {}", new 
Object[]{transitUri});
+            return null;
+        }
+
+        final String clusterName = 
context.getClusterResolver().fromHostNames(splitHostNames(uriMatcher.group(1)));
+        String connectedDatabaseName = null;
+        if (uriMatcher.groupCount() > 1) {
+            // Try to find connected database name from connection parameters.
+            final String[] connectionParams = uriMatcher.group(2).split(";");
+            connectedDatabaseName = connectionParams[0];
+        }
+
+        if (StringUtils.isEmpty(connectedDatabaseName)) {
+            // If not found, then use "default".
+            connectedDatabaseName = "default";
+        }
 
         final Set<Tuple<String, String>> inputTables = 
parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_INPUT_TABLES));
         final Set<Tuple<String, String>> outputTables = 
parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_OUTPUT_TABLES));
@@ -97,6 +125,6 @@ public class Hive2JDBC extends AbstractHiveAnalyzer {
 
     @Override
     public String targetTransitUriPattern() {
-        return "^jdbc:hive2://.+$";
+        return URI_PATTERN_STR;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a0bd6113/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
index e3d4709..ff86166 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
@@ -62,14 +62,8 @@ public class KafkaTopic extends 
AbstractNiFiProvenanceEventAnalyzer {
             return null;
         }
 
-        String clusterName = null;
-        for (String broker : uriMatcher.group(1).split(",")) {
-            final String brokerHostname = broker.split(":")[0].trim();
-            clusterName = 
context.getClusterResolver().fromHostNames(brokerHostname);
-            if (clusterName != null && !clusterName.isEmpty()) {
-                break;
-            }
-        }
+        final String[] hostNames = splitHostNames(uriMatcher.group(1));
+        final String clusterName = 
context.getClusterResolver().fromHostNames(hostNames);
 
         final String topicName = uriMatcher.group(2);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a0bd6113/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
index 9e1a92c..f63d8ca 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
@@ -37,6 +37,7 @@ import static 
org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATT
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.matches;
 import static org.mockito.Mockito.when;
 
@@ -160,4 +161,136 @@ public class TestHive2JDBC {
         assertEquals("tableB1", ref.get(ATTR_NAME));
         assertEquals("databaseB.tableB1@cluster1", 
ref.get(ATTR_QUALIFIED_NAME));
     }
+
+    /**
+     * A Hive connection URL can have connection strings delimited by 
semicolons.
+     */
+    @Test
+    public void testTableLineageWithDefaultTableNameWithConnectionParams() {
+        final String processorName = "PutHiveQL";
+        final String transitUri = 
"jdbc:hive2://0.example.com:10000;transportMode=http;httpPath=cliservice";
+        final ProvenanceEventRecord record = 
Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentType()).thenReturn(processorName);
+        when(record.getTransitUri()).thenReturn(transitUri);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
+        // E.g. insert into databaseB.tableB1 select something from tableA1 a1 
inner join tableA2 a2 where a1.id = a2.id
+        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, 
tableA2");
+        
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
+
+        final ClusterResolvers clusterResolvers = 
Mockito.mock(ClusterResolvers.class);
+        
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+
+        final NiFiProvenanceEventAnalyzer analyzer = 
NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, 
record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(2, refs.getInputs().size());
+        // QualifiedName : Name
+        final Map<String, String> expectedInputRefs = new HashMap<>();
+        expectedInputRefs.put("default.tableA1@cluster1", "tableA1");
+        expectedInputRefs.put("default.tableA2@cluster1", "tableA2");
+        for (Referenceable ref : refs.getInputs()) {
+            final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
+            assertTrue(expectedInputRefs.containsKey(qName));
+            assertEquals(expectedInputRefs.get(qName), ref.get(ATTR_NAME));
+        }
+
+        assertEquals(1, refs.getOutputs().size());
+        Referenceable ref = refs.getOutputs().iterator().next();
+        assertEquals("hive_table", ref.getTypeName());
+        assertEquals("tableB1", ref.get(ATTR_NAME));
+        assertEquals("databaseB.tableB1@cluster1", 
ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+    /**
+     * Hive connection URL can have multiple zookeeper host ports
+     * and multiple parameters delimited with semicolons.
+     * Database name can be omitted.
+     */
+    @Test
+    public void testTableLineageWithZookeeperDiscovery() {
+        final String processorName = "PutHiveQL";
+        final String transitUri = 
"jdbc:hive2://0.example.com:2181,1.example.com:2181,2.example.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2";
+        final ProvenanceEventRecord record = 
Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentType()).thenReturn(processorName);
+        when(record.getTransitUri()).thenReturn(transitUri);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
+        // E.g. insert into databaseB.tableB1 select something from tableA1 a1 
inner join tableA2 a2 where a1.id = a2.id
+        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, 
tableA2");
+        
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
+
+        final ClusterResolvers clusterResolvers = 
Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(eq("0.example.com"), 
eq("1.example.com"), eq("2.example.com"))).thenReturn("cluster1");
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+
+        final NiFiProvenanceEventAnalyzer analyzer = 
NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, 
record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(2, refs.getInputs().size());
+        // QualifiedName : Name
+        final Map<String, String> expectedInputRefs = new HashMap<>();
+        expectedInputRefs.put("default.tableA1@cluster1", "tableA1");
+        expectedInputRefs.put("default.tableA2@cluster1", "tableA2");
+        for (Referenceable ref : refs.getInputs()) {
+            final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
+            assertTrue(expectedInputRefs.containsKey(qName));
+            assertEquals(expectedInputRefs.get(qName), ref.get(ATTR_NAME));
+        }
+
+        assertEquals(1, refs.getOutputs().size());
+        Referenceable ref = refs.getOutputs().iterator().next();
+        assertEquals("hive_table", ref.getTypeName());
+        assertEquals("tableB1", ref.get(ATTR_NAME));
+        assertEquals("databaseB.tableB1@cluster1", 
ref.get(ATTR_QUALIFIED_NAME));
+    }
+
+    /**
+     * Hive connection URL using zookeeper and database name.
+     */
+    @Test
+    public void testTableLineageWithZookeeperDiscoverySpecificDatabase() {
+        final String processorName = "PutHiveQL";
+        final String transitUri = 
"jdbc:hive2://0.example.com:2181,1.example.com:2181/some_database;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2";
+        final ProvenanceEventRecord record = 
Mockito.mock(ProvenanceEventRecord.class);
+        when(record.getComponentType()).thenReturn(processorName);
+        when(record.getTransitUri()).thenReturn(transitUri);
+        when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
+        // E.g. insert into databaseB.tableB1 select something from tableA1 a1 
inner join tableA2 a2 where a1.id = a2.id
+        when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, 
tableA2");
+        
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");
+
+        final ClusterResolvers clusterResolvers = 
Mockito.mock(ClusterResolvers.class);
+        when(clusterResolvers.fromHostNames(eq("0.example.com"), 
eq("1.example.com"))).thenReturn("cluster1");
+
+        final AnalysisContext context = Mockito.mock(AnalysisContext.class);
+        when(context.getClusterResolver()).thenReturn(clusterResolvers);
+
+        final NiFiProvenanceEventAnalyzer analyzer = 
NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, 
record.getEventType());
+        assertNotNull(analyzer);
+
+        final DataSetRefs refs = analyzer.analyze(context, record);
+        assertEquals(2, refs.getInputs().size());
+        // QualifiedName : Name
+        final Map<String, String> expectedInputRefs = new HashMap<>();
+        expectedInputRefs.put("some_database.tableA1@cluster1", "tableA1");
+        expectedInputRefs.put("some_database.tableA2@cluster1", "tableA2");
+        for (Referenceable ref : refs.getInputs()) {
+            final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
+            assertTrue(expectedInputRefs.containsKey(qName));
+            assertEquals(expectedInputRefs.get(qName), ref.get(ATTR_NAME));
+        }
+
+        assertEquals(1, refs.getOutputs().size());
+        Referenceable ref = refs.getOutputs().iterator().next();
+        assertEquals("hive_table", ref.getTypeName());
+        assertEquals("tableB1", ref.get(ATTR_NAME));
+        assertEquals("databaseB.tableB1@cluster1", 
ref.get(ATTR_QUALIFIED_NAME));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a0bd6113/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
index 5c0fd0e..543ac89 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
@@ -31,6 +31,7 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.matches;
 import static org.mockito.Mockito.when;
 
@@ -73,7 +74,7 @@ public class TestKafkaTopic {
         when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
 
         final ClusterResolvers clusterResolvers = 
Mockito.mock(ClusterResolvers.class);
-        
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
+        when(clusterResolvers.fromHostNames(eq("0.example.com"), 
eq("1.example.com"))).thenReturn("cluster1");
 
         final AnalysisContext context = Mockito.mock(AnalysisContext.class);
         when(context.getClusterResolver()).thenReturn(clusterResolvers);

Reply via email to