[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295292#comment-16295292
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on the issue:

https://github.com/apache/nifi/pull/2335
  
@ijokarumawak all looks good after some local testing. Thanks for your 
diligence and the willingness to keep going on this until we got everything 
resolved! I have now merged this to master!


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
> Fix For: 1.5.0
>
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295289#comment-16295289
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2335


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-18 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295287#comment-16295287
 ] 

ASF subversion and git services commented on NIFI-3709:
---

Commit 9750cf2fcda0209e897e98e632f7e42f3f17e3d9 in nifi's branch 
refs/heads/master from [~markap14]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=9750cf2 ]

NIFI-3709: Added XMLENC to L&N file. Moved inclusion of nifi-atlas-nar to a 
profile named include-atlas. This closes #2335.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-18 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295286#comment-16295286
 ] 

ASF subversion and git services commented on NIFI-3709:
---

Commit fc73c609240de81d7379bda7c281f064ebe02714 in nifi's branch 
refs/heads/master from [~ijokarumawak]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=fc73c60 ]

NIFI-3709: Export NiFi flow dataset lineage to Apache Atlas


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295163#comment-16295163
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on the issue:

https://github.com/apache/nifi/pull/2335
  
@ijokarumawak yes I believe your understanding to be correct. Thanks for 
the clarifications! In terms of code I am good with the PR. Just want to test 
this against a live instance and as soon as I can confirm that all works well, 
I can push to master.

However, I do see that this nar alone is > 60 MB. For now, I think I it 
would be best to add a Maven profile for activating this so that those who 
don't need this don't have to pull in the extra weight. Is that OK with you?


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16294418#comment-16294418
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on the issue:

https://github.com/apache/nifi/pull/2335
  
@markap14 I found that ReportLineageToAtlas does get UNKNOWN provenance 
events when it queries lineage by 
`ProvenanceRepository.submitLineageComputation(final long eventId, final 
NiFiUser user)`. It uses the event hierarchies returned by that method, but to 
analyze each event detail fully, it also calls 
`ProvenanceEventRepository.getEvent(final long id)` which does not require 
user. That's why the reporting task worked even in a secured NiFi cluster.

I now understand why NiFi framework has two interfaces for provenance 
events, i.e. ProvenanceRepository and ProvenanceEventRepository. 
`ProvenanceRepository` provides methods to be called in a context with a user 
who made the request. `ProvenanceEventRepository` is used outside of a user 
request, intended for Reporting Tasks, etc.

Is my understanding correct? If so, the approach of this PR looks good now? 
Thanks!


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292870#comment-16292870
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157257088
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
+return result;
+}
+logger.warn("Lineage query for {} timed out.", new 
Object[]{eventId});
+} catch (InterruptedException e) {
+logger.warn("Lineage query for {} was interrupted due to {}.", 
new Object[]{eventId, e}, e);
+} finally {
+submission.cancel();
+}
+
+return null;
+}
+
+@Override
+public ComputeLineageResult queryLineage(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitLineageComputation(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+public ComputeLineageResult findParents(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitExpandParents(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+// NOTE: This user is required to avoid NullPointerException at 
PersistentProvenanceRepository.submitLineageComputation
+private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser();
--- End diff --


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292857#comment-16292857
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on the issue:

https://github.com/apache/nifi/pull/2335
  
@markap14 @dmkoster Thank you very much for your informative review 
feedback! I've addressed all comments other than confirming the Provenance 
Event level authorization. I will keep digging that.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292747#comment-16292747
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157240784
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final P

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292739#comment-16292739
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157238842
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
+return result;
+}
+logger.warn("Lineage query for {} timed out.", new 
Object[]{eventId});
+} catch (InterruptedException e) {
+logger.warn("Lineage query for {} was interrupted due to {}.", 
new Object[]{eventId, e}, e);
+} finally {
+submission.cancel();
+}
+
+return null;
+}
+
+@Override
+public ComputeLineageResult queryLineage(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitLineageComputation(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+public ComputeLineageResult findParents(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitExpandParents(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+// NOTE: This user is required to avoid NullPointerException at 
PersistentProvenanceRepository.submitLineageComputation
+private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser();
--- End diff --


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292728#comment-16292728
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157237190
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final Prove

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292724#comment-16292724
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157236574
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
+return result;
+}
+logger.warn("Lineage query for {} timed out.", new 
Object[]{eventId});
+} catch (InterruptedException e) {
+logger.warn("Lineage query for {} was interrupted due to {}.", 
new Object[]{eventId, e}, e);
+} finally {
+submission.cancel();
+}
+
+return null;
+}
+
+@Override
+public ComputeLineageResult queryLineage(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitLineageComputation(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+public ComputeLineageResult findParents(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitExpandParents(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+// NOTE: This user is required to avoid NullPointerException at 
PersistentProvenanceRepository.submitLineageComputation
+private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser();
--- End diff --


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292712#comment-16292712
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157234799
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.AtlasNiFiFlowLineage/additionalDetails.html
 ---
@@ -0,0 +1,541 @@
+
+
+
+
+
+AtlasNiFiFlowLineage
+
+
+
+
+AtlasNiFiFlowLineage
+
+Table of contents:
+
+
+Information reported to 
Atlas
+NiFi Atlas Types
+Cluster Name Resolution
+NiFi flow structure
+
+Path Separation 
Logic
+
+
+NiFi data lineage
+
+NiFi Lineage 
Strategy
+NiFi Provenance Event 
Analysis
+Supported 
DataSets and Processors
+
+
+How it runs in NiFi 
cluster
+Limitations
+Atlas Server 
Configurations
+Atlas Server Emulator
+
+
+Information reported to Atlas
+This reporting task stores two types of NiFi flow information, 
'NiFi flow structure' and 'NiFi data lineage'.
+
+'NiFi flow structure' tells what components are running within 
a NiFi flow and how these are connected. It is reported by analyzing current 
NiFi flow structure, specifically NiFi component relationships.
+
+'NiFi data lineage' tells what part of NiFi flow interacts with 
different DataSets such as HDFS files or Hive tables ... etc. It is reported by 
analyzing NiFi provenance events.
+
+
+
+Technically each information is sent using different protocol, 
Atlas REST API v2, and Notification via a Kafka topic as shown in above 
image.
+
+
+As both information types use the same NiFi Atlas Types and Cluster Name Resolution concepts, it is recommended to 
start reading those sections first.
+
+NiFi Atlas Types
+
+This reporting task creates following NiFi specific types in 
Atlas Type system when it runs if these type definitions are not found.
+
+Green boxes represent sub-types of DataSet and blue ones are 
sub-types of Process. Gray lines represent entity ownership.
+Red lines represent lineage.
+
+
+
+
+nifi_flow
+Represents a NiFI data flow.
+As shown in the above diagram, nifi_flow owns other 
nifi_component types.
+This owning relationship is defined by Atlas 'owned' 
constraint so that when a 'nifi_flow' entity is removed, all owned NiFi 
component entities are removed in cascading manner.
+When this reporting task runs, it analyzes and traverse 
the entire flow structure, and create NiFi component entities in Atlas.
+At later runs, it compares the current flow structure 
with the one stored in Atlas to figure out if any changes has been made since 
the last time the flow was reported. The reporting task updates NiFi component 
entities in Atlas if needed.
+NiFi components those are removed from a NiFi flow also 
get deleted from Atlas.
+However those entities can still be seen in Atlas 
search results or lineage graphs since Atlas uses 'Soft Delete' by default.
+See Atlas Delete Handler 
for further detail.
+
+Attributes:
+
+qualifiedName: Root ProcessGroup ID@clusterName (e.g. 
86420a14-2fab-3e1e-4331-fb6ab42f58e0@cl1)
+name: Name of the Root ProcessGroup.
+url: URL of the NiFi instance. This can be specified 
via reporting task 'NiFi URL for Atlas' property.
+
+
+
+nifi_flow_path Part of a NiFi data flow containing one 
or more processing NiFi components such as Processors and RemoteGroupPorts. The 
reporting task divides a NiFi flow into multiple flow paths. See Path Separation Logic for details.
+Attributes:
+
+qualifiedName: The first NiFi component Id in a 
path@clusterName (e.g. 529e6722-9b49-3b66-9c94-00da9863ca2d@cl1)
+name: NiFi component namess within a path are 
concatenated (e.g. GenerateFlowFile, PutFile, LogAttribute)
+ 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292695#comment-16292695
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157232014
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+
+public class NiFiProvenanceEventAnalyzerFactory {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiProvenanceEventAnalyzerFactory.class);
+private static final Map 
analyzersForComponentType = new ConcurrentHashMap<>();
+private static final Map 
analyzersForTransitUri = new ConcurrentHashMap<>();
+private static final Map analyzersForProvenanceEventType = new 
ConcurrentHashMap<>();
+private static boolean loaded = false;
+
+private static void loadAnalyzers() {
+logger.debug("Loading NiFiProvenanceEventAnalyzer ...");
+final ServiceLoader serviceLoader
+= ServiceLoader.load(NiFiProvenanceEventAnalyzer.class);
+serviceLoader.forEach(analyzer -> {
+addAnalyzer(analyzer.targetComponentTypePattern(), 
analyzersForComponentType, analyzer);
+addAnalyzer(analyzer.targetTransitUriPattern(), 
analyzersForTransitUri, analyzer);
+final ProvenanceEventType eventType = 
analyzer.targetProvenanceEventType();
+if (eventType != null) {
+if 
(analyzersForProvenanceEventType.containsKey(eventType)) {
+logger.warn("Fo ProvenanceEventType {}, an Analyzer {} 
is already assigned." +
+" Only one analyzer for a type can be 
registered. Ignoring {}",
+eventType, 
analyzersForProvenanceEventType.get(eventType), analyzer);
+}
+analyzersForProvenanceEventType.put(eventType, analyzer);
+}
+});
+logger.info("Loaded NiFiProvenanceEventAnalyzers: 
componentTypes={}, transitUris={}", analyzersForComponentType, 
analyzersForTransitUri);
+}
+
+private static void addAnalyzer(String patternStr, Map toAdd,
+NiFiProvenanceEventAnalyzer analyzer) {
+if (patternStr != null && !patternStr.isEmpty()) {
+Pattern pattern = Pattern.compile(patternStr.trim());
+toAdd.put(pattern, analyzer);
+}
+}
+
+/**
+ * Find and retrieve NiFiProvenanceEventAnalyzer implementation for 
the specified targets.
+ * Pattern matching is performed by following order, and the one found 
at first is returned:
+ * 
+ * Component type name. Use an analyzer supporting the Component 
type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * TransitUri. Use an analyzer supporting the TransitUri with its 
{@link NiFiProvenanceEventAnalyzer#targetTransitUriPattern()}.
+ * Provenance Event Type. Use an analyzer supporting the 
Provenance Event Type with its {@link 
NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * 
+ * @param typeName NiFi component type name.
+ * @param transitUri Transit URI.
+ * @param eventType Provenance event type.
+ * @return Instance of NiFiProvenanceEventAnalyzer if one is found for 
the specified className, otherwise null.
+ */
+ 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292694#comment-16292694
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157231905
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+
+public class NiFiProvenanceEventAnalyzerFactory {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiProvenanceEventAnalyzerFactory.class);
+private static final Map 
analyzersForComponentType = new ConcurrentHashMap<>();
+private static final Map 
analyzersForTransitUri = new ConcurrentHashMap<>();
+private static final Map analyzersForProvenanceEventType = new 
ConcurrentHashMap<>();
+private static boolean loaded = false;
+
+private static void loadAnalyzers() {
+logger.debug("Loading NiFiProvenanceEventAnalyzer ...");
+final ServiceLoader serviceLoader
+= ServiceLoader.load(NiFiProvenanceEventAnalyzer.class);
+serviceLoader.forEach(analyzer -> {
+addAnalyzer(analyzer.targetComponentTypePattern(), 
analyzersForComponentType, analyzer);
+addAnalyzer(analyzer.targetTransitUriPattern(), 
analyzersForTransitUri, analyzer);
+final ProvenanceEventType eventType = 
analyzer.targetProvenanceEventType();
+if (eventType != null) {
+if 
(analyzersForProvenanceEventType.containsKey(eventType)) {
+logger.warn("Fo ProvenanceEventType {}, an Analyzer {} 
is already assigned." +
+" Only one analyzer for a type can be 
registered. Ignoring {}",
+eventType, 
analyzersForProvenanceEventType.get(eventType), analyzer);
+}
+analyzersForProvenanceEventType.put(eventType, analyzer);
+}
+});
+logger.info("Loaded NiFiProvenanceEventAnalyzers: 
componentTypes={}, transitUris={}", analyzersForComponentType, 
analyzersForTransitUri);
+}
+
+private static void addAnalyzer(String patternStr, Map toAdd,
+NiFiProvenanceEventAnalyzer analyzer) {
+if (patternStr != null && !patternStr.isEmpty()) {
+Pattern pattern = Pattern.compile(patternStr.trim());
+toAdd.put(pattern, analyzer);
+}
+}
+
+/**
+ * Find and retrieve NiFiProvenanceEventAnalyzer implementation for 
the specified targets.
+ * Pattern matching is performed by following order, and the one found 
at first is returned:
+ * 
+ * Component type name. Use an analyzer supporting the Component 
type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * TransitUri. Use an analyzer supporting the TransitUri with its 
{@link NiFiProvenanceEventAnalyzer#targetTransitUriPattern()}.
+ * Provenance Event Type. Use an analyzer supporting the 
Provenance Event Type with its {@link 
NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * 
+ * @param typeName NiFi component type name.
+ * @param transitUri Transit URI.
+ * @param eventType Provenance event type.
+ * @return Instance of NiFiProvenanceEventAnalyzer if one is found for 
the specified className, otherwise null.
+ */
+ 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292663#comment-16292663
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157223751
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final P

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292661#comment-16292661
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157222944
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292658#comment-16292658
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157222062
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification 
messages within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link 
#commitMessages()} should be used serially from a single thread.
+ */
+public class NiFIAtlasHook extends AtlasHook implements LineageContext {
+
+public static final String NIFI_USER = "nifi";
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFIAtlasHook.class);
+private static final String CONF_PREFIX = "atlas.hook.nifi.";
+private static final String HOOK_NUM_RETRIES = CONF_PREFIX + 
"numRetries";
+
+private final NiFiAtlasClient atlasClient;
+
+/**
+ * An index to resolve a qualifiedName from a GUID.
+ */
+private final Map guidToQualifiedName;
+/**
+ * An index to resolve a Referenceable from a typeName::qualifiedName.
+ */
+private final Map typedQualifiedNameToRef;
+
+
+private static  Map createCache(final int maxSize) {
+return new LinkedHashMap(maxSize, 0.75f, true) {
--- End diff --

Well, it's just a private method to create cache instances within this 
class. So if different default optimizations are needed, then we can do that 
here as well without affecting others.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those inter

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292657#comment-16292657
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157221674
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification 
messages within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link 
#commitMessages()} should be used serially from a single thread.
+ */
+public class NiFIAtlasHook extends AtlasHook implements LineageContext {
+
+public static final String NIFI_USER = "nifi";
--- End diff --

Yes, it would. However, at this moment, this user is not significant. 
Please see this conversation on the same subject.
https://github.com/apache/nifi/pull/2335#discussion_r156992235


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact w

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292654#comment-16292654
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157221249
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification 
messages within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link 
#commitMessages()} should be used serially from a single thread.
+ */
+public class NiFIAtlasHook extends AtlasHook implements LineageContext {
--- End diff --

Good catch, thank you! I'll fix this.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the sam

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292652#comment-16292652
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157221167
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/AtlasUtils.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import org.apache.atlas.model.instance.AtlasObjectId;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+
+public class AtlasUtils {
+
+public static String toStr(Object obj) {
+return obj != null ? obj.toString() : null;
+}
+
+
+public static boolean isGuidAssigned(String guid) {
+return guid != null && !guid.startsWith("-");
--- End diff --

Yes, it's Atlas internal implementation. However, checking null is not 
sufficient, because entities created at client side but not yet registered in 
Atlas have negative GUIDs. I'd like to keep it this way if there's no strong 
objections. Thank you.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292649#comment-16292649
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157220655
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292648#comment-16292648
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157219892
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
--- End diff --

I agree on that this can be a costly operation. The reason to query 
provenance is to compute a lineage from a DROP provenance event. This is used 
by 'Complete Path' strategy. I wrote documentation on performance impact. If 
this does not work for a use-case, then user can choose another strategy, 
'Simple Path'. Simple Path does not query provenance events this way. It 
analyzes each individual event, so should be more lightweight.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of datase

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292640#comment-16292640
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157217791
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292639#comment-16292639
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157217462
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292626#comment-16292626
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157216281
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292625#comment-16292625
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157216102
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292614#comment-16292614
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157214953
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292612#comment-16292612
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157214860
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final P

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292582#comment-16292582
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157208859
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292577#comment-16292577
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157208324
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292576#comment-16292576
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157208066
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
+return result;
+}
+logger.warn("Lineage query for {} timed out.", new 
Object[]{eventId});
+} catch (InterruptedException e) {
+logger.warn("Lineage query for {} was interrupted due to {}.", 
new Object[]{eventId, e}, e);
+} finally {
+submission.cancel();
+}
+
+return null;
+}
+
+@Override
+public ComputeLineageResult queryLineage(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitLineageComputation(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+public ComputeLineageResult findParents(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitExpandParents(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+// NOTE: This user is required to avoid NullPointerException at 
PersistentProvenanceRepository.submitLineageComputation
+private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser();
--- End diff --


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291502#comment-16291502
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user dmkoster commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157042460
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}
   

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291498#comment-16291498
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user dmkoster commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157035283
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification 
messages within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link 
#commitMessages()} should be used serially from a single thread.
+ */
+public class NiFIAtlasHook extends AtlasHook implements LineageContext {
+
+public static final String NIFI_USER = "nifi";
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFIAtlasHook.class);
+private static final String CONF_PREFIX = "atlas.hook.nifi.";
+private static final String HOOK_NUM_RETRIES = CONF_PREFIX + 
"numRetries";
+
+private final NiFiAtlasClient atlasClient;
+
+/**
+ * An index to resolve a qualifiedName from a GUID.
+ */
+private final Map guidToQualifiedName;
+/**
+ * An index to resolve a Referenceable from a typeName::qualifiedName.
+ */
+private final Map typedQualifiedNameToRef;
+
+
+private static  Map createCache(final int maxSize) {
+return new LinkedHashMap(maxSize, 0.75f, true) {
--- End diff --

The default load factor is currently 0.75. Is there a reason for fixing the 
value versus allowing future default optimizations?


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who u

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291497#comment-16291497
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user dmkoster commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157034065
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification 
messages within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link 
#commitMessages()} should be used serially from a single thread.
+ */
+public class NiFIAtlasHook extends AtlasHook implements LineageContext {
+
+public static final String NIFI_USER = "nifi";
--- End diff --

Would this be best offered as a configurable value?


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we alread

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291500#comment-16291500
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user dmkoster commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157047074
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final Prove

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291499#comment-16291499
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user dmkoster commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157029294
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification 
messages within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link 
#commitMessages()} should be used serially from a single thread.
+ */
+public class NiFIAtlasHook extends AtlasHook implements LineageContext {
--- End diff --

The class name follows a different capitalization pattern than the others. 
Perhaps a typo?
NiFIAtlasHook vs NifiAtlasHook.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. A

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291501#comment-16291501
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user dmkoster commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157031810
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/AtlasUtils.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import org.apache.atlas.model.instance.AtlasObjectId;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+
+public class AtlasUtils {
+
+public static String toStr(Object obj) {
+return obj != null ? obj.toString() : null;
+}
+
+
+public static boolean isGuidAssigned(String guid) {
+return guid != null && !guid.startsWith("-");
--- End diff --

The guid starting with a dash is an internal detail of the AtlasEntity 
class and might change. Checking for null should be sufficient


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291346#comment-16291346
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on the issue:

https://github.com/apache/nifi/pull/2335
  
@ijokarumawak thanks for all of the work that you've put into this - it is 
very much a non-trivial effort! For the most part, the code looks good. I 
flagged a couple of minor things in the code, 1 or 2 thread-safety issues that 
should be easy to address.

The only 'more significant' concern that I have is the use of the 
dummied-up NiFiUser. As-is, this is an anonymous user and in a secured 
environment will not retrieve the event details that are necessary. It also 
means that we would be validating events against a user who doesn't even exist.

I think there are 2 ways to approach this: first, as I noted inline, we 
could have a property to define which user the queries should run on behalf of. 
So the user could add a "NiFi Atlas" user and use that. However, that's also a 
bit concerning because it means that whoever has access to edit the reporting 
task can run provenance queries on behalf of another user.

By far, my preference is to actually just update the ProvenanceRepository 
implementations (There are 4 now, I think) so that if a null User is passed in, 
we don't check permissions. This would mean that you can pass in null from 
Reporting Task. We could also then update the interface to have an overloaded 
method that does not require that a user be given.

Once that is addressed, I think it is a +1 from me from a code review 
perspective.

Thanks
-Mark


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291319#comment-16291319
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157025778
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.AtlasNiFiFlowLineage/additionalDetails.html
 ---
@@ -0,0 +1,541 @@
+
+
+
+
+
+AtlasNiFiFlowLineage
+
+
+
+
+AtlasNiFiFlowLineage
+
+Table of contents:
+
+
+Information reported to 
Atlas
+NiFi Atlas Types
+Cluster Name Resolution
+NiFi flow structure
+
+Path Separation 
Logic
+
+
+NiFi data lineage
+
+NiFi Lineage 
Strategy
+NiFi Provenance Event 
Analysis
+Supported 
DataSets and Processors
+
+
+How it runs in NiFi 
cluster
+Limitations
+Atlas Server 
Configurations
+Atlas Server Emulator
+
+
+Information reported to Atlas
+This reporting task stores two types of NiFi flow information, 
'NiFi flow structure' and 'NiFi data lineage'.
+
+'NiFi flow structure' tells what components are running within 
a NiFi flow and how these are connected. It is reported by analyzing current 
NiFi flow structure, specifically NiFi component relationships.
+
+'NiFi data lineage' tells what part of NiFi flow interacts with 
different DataSets such as HDFS files or Hive tables ... etc. It is reported by 
analyzing NiFi provenance events.
+
+
+
+Technically each information is sent using different protocol, 
Atlas REST API v2, and Notification via a Kafka topic as shown in above 
image.
+
+
+As both information types use the same NiFi Atlas Types and Cluster Name Resolution concepts, it is recommended to 
start reading those sections first.
+
+NiFi Atlas Types
+
+This reporting task creates following NiFi specific types in 
Atlas Type system when it runs if these type definitions are not found.
+
+Green boxes represent sub-types of DataSet and blue ones are 
sub-types of Process. Gray lines represent entity ownership.
+Red lines represent lineage.
+
+
+
+
+nifi_flow
+Represents a NiFI data flow.
+As shown in the above diagram, nifi_flow owns other 
nifi_component types.
+This owning relationship is defined by Atlas 'owned' 
constraint so that when a 'nifi_flow' entity is removed, all owned NiFi 
component entities are removed in cascading manner.
+When this reporting task runs, it analyzes and traverse 
the entire flow structure, and create NiFi component entities in Atlas.
+At later runs, it compares the current flow structure 
with the one stored in Atlas to figure out if any changes has been made since 
the last time the flow was reported. The reporting task updates NiFi component 
entities in Atlas if needed.
+NiFi components those are removed from a NiFi flow also 
get deleted from Atlas.
+However those entities can still be seen in Atlas 
search results or lineage graphs since Atlas uses 'Soft Delete' by default.
+See Atlas Delete Handler 
for further detail.
+
+Attributes:
+
+qualifiedName: Root ProcessGroup ID@clusterName (e.g. 
86420a14-2fab-3e1e-4331-fb6ab42f58e0@cl1)
+name: Name of the Root ProcessGroup.
+url: URL of the NiFi instance. This can be specified 
via reporting task 'NiFi URL for Atlas' property.
+
+
+
+nifi_flow_path Part of a NiFi data flow containing one 
or more processing NiFi components such as Processors and RemoteGroupPorts. The 
reporting task divides a NiFi flow into multiple flow paths. See Path Separation Logic for details.
+Attributes:
+
+qualifiedName: The first NiFi component Id in a 
path@clusterName (e.g. 529e6722-9b49-3b66-9c94-00da9863ca2d@cl1)
+name: NiFi component namess within a path are 
concatenated (e.g. GenerateFlowFile, PutFile, LogAttribute)
+u

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291212#comment-16291212
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157009606
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.St

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291193#comment-16291193
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157005913
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.St

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291192#comment-16291192
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157005524
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.St

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291190#comment-16291190
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157005140
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.St

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291185#comment-16291185
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157004365
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final Prove

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291122#comment-16291122
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r156992235
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
+return result;
+}
+logger.warn("Lineage query for {} timed out.", new 
Object[]{eventId});
+} catch (InterruptedException e) {
+logger.warn("Lineage query for {} was interrupted due to {}.", 
new Object[]{eventId, e}, e);
+} finally {
+submission.cancel();
+}
+
+return null;
+}
+
+@Override
+public ComputeLineageResult queryLineage(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitLineageComputation(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+public ComputeLineageResult findParents(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitExpandParents(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+// NOTE: This user is required to avoid NullPointerException at 
PersistentProvenanceRepository.submitLineageComputation
+private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser();
--- End diff --


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291088#comment-16291088
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r156985994
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
--- End diff --

I'm not sure yet the context in which this is used. However, this is a bit 
of a red flag. In order to compute a FlowFile's lineage, depending on the size 
of the Provenance Repository and the implementation, it may take many minutes 
(consider a Provenance Repository that is a couple of terabytes in size and the 
event is one of the oldest -it will have to search all lucene indices for 
this). In most cases it is fairly quick, but we should not assume that it will 
be. What exactly is the implication here if this always times out?


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atl

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291062#comment-16291062
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r156984919
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+
+public class NiFiProvenanceEventAnalyzerFactory {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiProvenanceEventAnalyzerFactory.class);
+private static final Map 
analyzersForComponentType = new ConcurrentHashMap<>();
+private static final Map 
analyzersForTransitUri = new ConcurrentHashMap<>();
+private static final Map analyzersForProvenanceEventType = new 
ConcurrentHashMap<>();
+private static boolean loaded = false;
+
+private static void loadAnalyzers() {
+logger.debug("Loading NiFiProvenanceEventAnalyzer ...");
+final ServiceLoader serviceLoader
+= ServiceLoader.load(NiFiProvenanceEventAnalyzer.class);
+serviceLoader.forEach(analyzer -> {
+addAnalyzer(analyzer.targetComponentTypePattern(), 
analyzersForComponentType, analyzer);
+addAnalyzer(analyzer.targetTransitUriPattern(), 
analyzersForTransitUri, analyzer);
+final ProvenanceEventType eventType = 
analyzer.targetProvenanceEventType();
+if (eventType != null) {
+if 
(analyzersForProvenanceEventType.containsKey(eventType)) {
+logger.warn("Fo ProvenanceEventType {}, an Analyzer {} 
is already assigned." +
+" Only one analyzer for a type can be 
registered. Ignoring {}",
+eventType, 
analyzersForProvenanceEventType.get(eventType), analyzer);
+}
+analyzersForProvenanceEventType.put(eventType, analyzer);
+}
+});
+logger.info("Loaded NiFiProvenanceEventAnalyzers: 
componentTypes={}, transitUris={}", analyzersForComponentType, 
analyzersForTransitUri);
+}
+
+private static void addAnalyzer(String patternStr, Map toAdd,
+NiFiProvenanceEventAnalyzer analyzer) {
+if (patternStr != null && !patternStr.isEmpty()) {
+Pattern pattern = Pattern.compile(patternStr.trim());
+toAdd.put(pattern, analyzer);
+}
+}
+
+/**
+ * Find and retrieve NiFiProvenanceEventAnalyzer implementation for 
the specified targets.
+ * Pattern matching is performed by following order, and the one found 
at first is returned:
+ * 
+ * Component type name. Use an analyzer supporting the Component 
type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * TransitUri. Use an analyzer supporting the TransitUri with its 
{@link NiFiProvenanceEventAnalyzer#targetTransitUriPattern()}.
+ * Provenance Event Type. Use an analyzer supporting the 
Provenance Event Type with its {@link 
NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * 
+ * @param typeName NiFi component type name.
+ * @param transitUri Transit URI.
+ * @param eventType Provenance event type.
+ * @return Instance of NiFiProvenanceEventAnalyzer if one is found for 
the specified className, otherwise null.
+ */
+p

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291054#comment-16291054
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r156982816
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}
   

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291051#comment-16291051
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r156982630
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}
   

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291034#comment-16291034
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r156979528
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}
   

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16285296#comment-16285296
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on the issue:

https://github.com/apache/nifi/pull/2335
  
Dear reviewers. This PR has more than 20K lines of code, it may be 
difficult to review it quickly. However, I wrote as much documentation as 
possible at additionalDetail.html of the NiFiAtlasFlowLineage reporting task, 
and hopefully it helps to make the review cycle quicker. Please read the doc 
first. Thanks for your time and effort in advance!


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16285293#comment-16285293
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

GitHub user ijokarumawak opened a pull request:

https://github.com/apache/nifi/pull/2335

NIFI-3709: Export NiFi flow dataset lineage to Apache Atlas

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [x] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [x] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [x] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [x] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [x] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijokarumawak/nifi nifi-3709-5

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2335.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2335


commit d710ceee0fd6076c9bac90f0e3b55f5bb990bd33
Author: Koji Kawamura 
Date:   2017-10-30T03:41:27Z

NIFI-3709: Export NiFi flow dataset lineage to Apache Atlas




> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-03 Thread Koji Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276267#comment-16276267
 ] 

Koji Kawamura commented on NIFI-3709:
-

Until NIFI-4564 is addressed, S2S RAW protocol will not be reported in Atlas 
lineage.

> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-11-02 Thread Koji Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235464#comment-16235464
 ] 

Koji Kawamura commented on NIFI-3709:
-

[~ikethecoder] I've added an extension point inspired by your improvement, 
would you rebase your branch using this one? Other parts have been changed, 
too, so you may need to adjust your code. However, adding FlowFile level 
lineage strategy should be easier than before.
https://github.com/ijokarumawak/nifi/tree/nifi-3709-4

If you can extract a commit which has only FlowFile level lineage strategy 
additions, it would be very easy for me to cherry-pick it so that your 
contribution gets recorded in commit history and available when I create a PR 
using my branch.

Also, I've added another strategy, now my branch has 'Simple Path' and 
'Complete Path'. I've summarized comparison of these in this Gist. Would you 
take a look? I wonder if 'Complete Path' would be useful to your use-case, too. 
Please let me know how you think. Thanks!
https://gist.github.com/ijokarumawak/9c3f2b0e44d1f3f2cb1addc9eb53d893

> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-10-30 Thread Koji Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224538#comment-16224538
 ] 

Koji Kawamura commented on NIFI-3709:
-

[~ikethecoder] Thanks for sharing the detailed example. I understand the 
motivation to track lineage 'By File'. I'll consider a bit more to see if 
there's any other ways to achieve that.

> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-10-29 Thread Aidan Cope (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224291#comment-16224291
 ] 

Aidan Cope commented on NIFI-3709:
--

True - and we were aware of the lineage capability in NiFi itself - but as you 
mention, it is limited to what is tracked in NiFI - so you miss out on the 
"full" lineage.

I've prepared an example to try and show where the requirement is coming from:
  
https://github.com/ikethecoder/nifi-atlas/blob/master/scenarios/path_vs_file_comparison/comparison.md

I built a flow that shows a file being received, processed and sent to HDFS.  I 
ran files through it with the reporter configured for "By Path" and then again 
for "By File" - and supplied the screenshots in Atlas.

Have a look - you will see that the "By Path" strategy does not give us the 
detail that we want.  You may have ideas on how to perhaps meet the same 
requirement, but in a way consistent with the "By Path" strategy.

Cheers

> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-10-24 Thread Koji Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218156#comment-16218156
 ] 

Koji Kawamura commented on NIFI-3709:
-

[~ikethecoder] Thanks for sharing your work. I'm glad to know that you were 
able to utilize the extension point to analyze Provenance event such as for 
PutFile. Those part can be merged easily. So let me merge your commit once my 
work has been squashed into a final commit. Let's keep in touch!

As for the ByFileLineageStrategy, I am not quite sure if it's going to work 
with Atlas well. Because Atlas is designed for tracking DataSet level lineage 
rather than event level. That's why I decided to drop granularity to DataSet 
and flow_path level when NiFi reporting task reports lineage to Atlas.

If user would like to see a complete and detailed lineage of a FlowFile, NiFi 
provenance viewer would be more appropriate I think. By using NiFi provenance 
UI, user can see where the FlowFile came from and where it went.
https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#viewing-flowfile-lineage

I think the core benefit of exporting NiFi lineage to Atlas is that it let user 
can see a bigger picture not only within NiFi, but also other softwares such as 
Apache Storm, Kafka. Also, FlowFile lives inside a NiFi flow and does not have 
the same level of persistence like Database record or file on a file system, 
IMHO.

However, I may be missing something. Would you elaborate the benefit of 
exporting FlowFile level lineage to Atlas? Example Atlas lineage screenshot 
would be helpful to understand the use-case. Thanks!

> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-10-24 Thread Aidan Cope (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218124#comment-16218124
 ] 

Aidan Cope commented on NIFI-3709:
--

I have been working on some enhancements to the work that was being done by 
Koji here:

https://github.com/ijokarumawak/nifi/tree/nifi-3709-2/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task

The work in progress can be seen here:

https://github.com/ikethecoder/nifi/tree/nifi-3709-2/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task

Would like to get feedback on the proposed changes I am doing.  It is for a 
client, and their requirements require quite a detailed tracking of provenance 
data from NiFi to Atlas.  To this end, I added a parameter to the reporter task 
to indicate the granularity of the Atlas processes - with the two options being 
"By Flow Path" and "By Flow File".  By Flow Path is the existing approach by 
Koji.  The "By Flow File" uses the File UUID as the qualifier name for the 
process, rather than the component Id.  The client wants to be able to track 
files through the complete NiFi Flow over time, which can not be easily done 
using the way it was implemented.  I think both use strategies are valid, 
depending on the use case and requirement - which is why I made it as a 
parameter on the reporting task.

But would really appreciate feedback on whether this is a viable approach and 
if it can be considered for a pull request?


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16154615#comment-16154615
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak closed the pull request at:

https://github.com/apache/nifi/pull/1676


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16154614#comment-16154614
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on the issue:

https://github.com/apache/nifi/pull/1676
  
Closing this PR. I'm adding use of Provenance events to support more 
Processors to report lineage and also covers NiFi Expression Language. The WIP 
code is available here, and I'm going to create another PR once it's get ready 
to be reviewed. Thanks!

https://github.com/ijokarumawak/nifi/tree/nifi-3709-2/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16154305#comment-16154305
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user gjlawran commented on the issue:

https://github.com/apache/nifi/pull/1676
  
We are also looking for assistance with bridging metadata from NiFi into 
Apache Atlas - and we have posted a [paid opportunity 
](https://github.com/bcgov/nifi-atlas/issues/1) those with an interest.  


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-04-25 Thread Joseph Witt (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15983595#comment-15983595
 ] 

Joseph Witt commented on NIFI-3709:
---

[~ijokarumawak] i've removed the fix version for now.  In my opinion it isn't 
quite ready for a release.  If you disagree please advise.

> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982308#comment-15982308
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user joewitt commented on the issue:

https://github.com/apache/nifi/pull/1676
  
@ijokarumawak so i see where this is heading and i get how it can help.  
That said, given expression language and such things I'm concerned just how 
much real flow information can be gleaned at design time (like this is looking 
this in looking at the flow defn itself) versus runtime using the provenance 
data itself.  The current approach has some important limitations like the 
processors which can be supported have to explicitly referenced in code for 
now.  The runtime approach using provenance events obviously has scale concerns 
but perhaps that is more correct and warrants further review.  What do you 
think?


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
> Fix For: 1.2.0
>
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982297#comment-15982297
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1676#discussion_r113096464
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/processors/EgressProcessors.java
 ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.processors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EgressProcessors {
+
+private static final Map processors = new HashMap<>();
+
+static {
+processors.put("org.apache.nifi.processors.hive.PutHiveStreaming", 
new PutHiveStreaming());
--- End diff --

So these Egress/Ingress processors are all that would be supported 
initially I take it?  It will be interesting to see how this works out and then 
perhaps we can have this done via an Annotation on processors that could be 
supported for egress/ingress.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
> Fix For: 1.2.0
>
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982295#comment-15982295
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1676#discussion_r113096205
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml ---
@@ -0,0 +1,102 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+4.0.0
+
+
+org.apache.nifi
+nifi-atlas-bundle
+1.2.0-SNAPSHOT
+
+
+nifi-atlas-reporting-task
+jar
+
+
+
+org.apache.nifi
+nifi-api
+
+
+org.apache.nifi
+nifi-processor-utils
+
+
+org.apache.nifi
+nifi-ssl-context-service-api
+
+
+org.apache.nifi
+nifi-client-dto
+
+
+org.apache.atlas
+atlas-client
+
+
+org.apache.atlas
+atlas-intg
+
+
+org.apache.atlas
+atlas-common
+
+
+org.codehaus.jettison
+jettison
+1.1
+
+
+
+stax
+stax-api
+
+
+
+
+com.sun.jersey
+jersey-json
+
+
+org.codehaus.jackson
+jackson-jaxrs
+1.9.13
+
+
+org.codehaus.jackson
+jackson-xc
+1.9.13
+
+
+
+
+org.apache.nifi
+nifi-mock
+test
+
+
+org.slf4j
+slf4j-simple
+test
+
+
+junit
+junit
+4.11
--- End diff --

can remove this version number and probably the whole block.  We have junit 
as a test scope dep in the parent pom.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
> Fix For: 1.2.0
>
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982292#comment-15982292
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1676#discussion_r113095894
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-nar/src/main/resources/META-INF/NOTICE
 ---
@@ -0,0 +1,188 @@
+nifi-atlas-nar
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===
+Apache Software License v2
+===
+
+The following binary components are provided under the Apache Software 
License v2
+
+  (ASLv2) Apache Atlas (incubating)
+The following NOTICE information applies:
+  Apache Atlas (incubating)
+  
+  Copyright [2015-2017] The Apache Software Foundation
+  
+  This product includes software developed at
--- End diff --

You can remove line 19 and 20.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
> Fix For: 1.2.0
>
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976284#comment-15976284
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on the issue:

https://github.com/apache/nifi/pull/1676
  
@joewitt Added L&N. I hope I've done it correctly. Please check.
BTW, [SmartBear has acquired Reverb 
technologies](https://smartbear.com/news/news-releases/sponsorship-of-swagger/) 
and its copyright has been changed to [SmartBear 
Software](https://github.com/swagger-api/swagger-core/blob/master/LICENSE). 
nifi-assembly and nifi-framework-nar still have 'Copyright 2015 Reverb 
Technologies, Inc.' in their NOTICE file. Should we update those, too?


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
> Fix For: 1.2.0
>
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976131#comment-15976131
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on the issue:

https://github.com/apache/nifi/pull/1676
  
@joewitt Thanks for pointing that out. I will update NOTICE.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
> Fix For: 1.2.0
>
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976124#comment-15976124
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user joewitt commented on the issue:

https://github.com/apache/nifi/pull/1676
  
doesnt quite look like all L&N considerations for the nar have been 
addressed.  please update


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
> Fix For: 1.2.0
>
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976123#comment-15976123
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1676#discussion_r112369112
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-nar/src/main/resources/META-INF/NOTICE
 ---
@@ -0,0 +1,26 @@
+nifi-hbase-nar
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===
+Apache Software License v2
+===
+
+The following binary components are provided under the Apache Software 
License v2
+
+  (ASLv2) Apache Atlas
+The following NOTICE information applies:
+  Apache Commons BeanUtils
--- End diff --

Copy and paste error.  Should say 'Apache Atlas'


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
> Fix For: 1.2.0
>
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-04-19 Thread Joseph Witt (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976122#comment-15976122
 ] 

Joseph Witt commented on NIFI-3709:
---

patch looks ready to roll and someone just asked about this capability today on 
users list.  set fix version and will review more fully shortly.  nar size is 
down to 13MB which is really nice

> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
> Fix For: 1.2.0
>
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976120#comment-15976120
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user joewitt commented on the issue:

https://github.com/apache/nifi/pull/1676
  
@ijokarumawak can you go ahead and squash these.  Thanks for getting it 
down to 13MB for the nar.  Much better!


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
> Fix For: 1.2.0
>
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15971333#comment-15971333
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

GitHub user ijokarumawak opened a pull request:

https://github.com/apache/nifi/pull/1676

NIFI-3709: NiFi flow lineage to Apache Atlas.

Adding AtlasNiFiFlowLineage reporting task to create NiFi lineage in
Atlas. See the additionalDetails.html for details.

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [x] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [x] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijokarumawak/nifi nifi-3709

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/1676.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1676


commit 70573ba42b282cc92fbbab5d631dd0ed21ab9a00
Author: Koji Kawamura 
Date:   2017-03-27T23:01:02Z

NIFI-3709: NiFi flow lineage to Apache Atlas.

Adding AtlasNiFiFlowLineage reporting task to create NiFi lineage in
Atlas. See the additionalDetails.html for details.




> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)