[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295292#comment-16295292 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/2335 @ijokarumawak all looks good after some local testing. Thanks for your diligence and the willingness to keep going on this until we got everything resolved! I have now merged this to master! > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.5.0 > > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295289#comment-16295289 ] ASF GitHub Bot commented on NIFI-3709: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2335 > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295287#comment-16295287 ] ASF subversion and git services commented on NIFI-3709: --- Commit 9750cf2fcda0209e897e98e632f7e42f3f17e3d9 in nifi's branch refs/heads/master from [~markap14] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=9750cf2 ] NIFI-3709: Added XMLENC to L&N file. Moved inclusion of nifi-atlas-nar to a profile named include-atlas. This closes #2335. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295286#comment-16295286 ] ASF subversion and git services commented on NIFI-3709: --- Commit fc73c609240de81d7379bda7c281f064ebe02714 in nifi's branch refs/heads/master from [~ijokarumawak] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=fc73c60 ] NIFI-3709: Export NiFi flow dataset lineage to Apache Atlas > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295163#comment-16295163 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/2335 @ijokarumawak yes I believe your understanding to be correct. Thanks for the clarifications! In terms of code I am good with the PR. Just want to test this against a live instance and as soon as I can confirm that all works well, I can push to master. However, I do see that this nar alone is > 60 MB. For now, I think I it would be best to add a Maven profile for activating this so that those who don't need this don't have to pull in the extra weight. Is that OK with you? > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16294418#comment-16294418 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2335 @markap14 I found that ReportLineageToAtlas does get UNKNOWN provenance events when it queries lineage by `ProvenanceRepository.submitLineageComputation(final long eventId, final NiFiUser user)`. It uses the event hierarchies returned by that method, but to analyze each event detail fully, it also calls `ProvenanceEventRepository.getEvent(final long id)` which does not require user. That's why the reporting task worked even in a secured NiFi cluster. I now understand why NiFi framework has two interfaces for provenance events, i.e. ProvenanceRepository and ProvenanceEventRepository. `ProvenanceRepository` provides methods to be called in a context with a user who made the request. `ProvenanceEventRepository` is used outside of a user request, intended for Reporting Tasks, etc. Is my understanding correct? If so, the approach of this PR looks good now? Thanks! > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292870#comment-16292870 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157257088 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +public class StandardAnalysisContext implements AnalysisContext { + +private final Logger logger = LoggerFactory.getLogger(StandardAnalysisContext.class); +private final NiFiFlow nifiFlow; +private final ClusterResolver clusterResolver; +private final ProvenanceRepository provenanceRepository; + +public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver clusterResolver, + ProvenanceRepository provenanceRepository) { +this.nifiFlow = nifiFlow; +this.clusterResolver = clusterResolver; +this.provenanceRepository = provenanceRepository; +} + +@Override +public List findConnectionTo(String componentId) { +return nifiFlow.getIncomingRelationShips(componentId); +} + +@Override +public List findConnectionFrom(String componentId) { +return nifiFlow.getOutgoingRelationShips(componentId); +} + +@Override +public String getNiFiClusterName() { +return nifiFlow.getClusterName(); +} + +@Override +public ClusterResolver getClusterResolver() { +return clusterResolver; +} + +private ComputeLineageResult getLineageResult(long eventId, ComputeLineageSubmission submission) { +final ComputeLineageResult result = submission.getResult(); +try { +if (result.awaitCompletion(10, TimeUnit.SECONDS)) { +return result; +} +logger.warn("Lineage query for {} timed out.", new Object[]{eventId}); +} catch (InterruptedException e) { +logger.warn("Lineage query for {} was interrupted due to {}.", new Object[]{eventId, e}, e); +} finally { +submission.cancel(); +} + +return null; +} + +@Override +public ComputeLineageResult queryLineage(long eventId) { +final ComputeLineageSubmission submission = provenanceRepository.submitLineageComputation(eventId, NIFI_USER); +return getLineageResult(eventId, submission); +} + +public ComputeLineageResult findParents(long eventId) { +final ComputeLineageSubmission submission = provenanceRepository.submitExpandParents(eventId, NIFI_USER); +return getLineageResult(eventId, submission); +} + +// NOTE: This user is required to avoid NullPointerException at PersistentProvenanceRepository.submitLineageComputation +private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser(); --- End diff --
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292857#comment-16292857 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2335 @markap14 @dmkoster Thank you very much for your informative review feedback! I've addressed all comments other than confirming the Provenance Event level authorization. I will keep digging that. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292747#comment-16292747 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157240784 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowPath; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.LineageEdge; +import org.apache.nifi.provenance.lineage.LineageNode; +import org.apache.nifi.provenance.lineage.LineageNodeType; + +import java.util.List; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class SimpleFlowPathLineage extends AbstractLineageStrategy { + +@Override +public void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event) { +final DataSetRefs refs = executeAnalyzer(analysisContext, event); +if (refs == null || (refs.isEmpty())) { +return; +} + +if ("Remote Input Port".equals(event.getComponentType()) || "Remote Output Port".equals(event.getComponentType())) { +processRemotePortEvent(analysisContext, nifiFlow, event, refs); +} else { +addDataSetRefs(nifiFlow, refs); +} + +} + +/** + * Create a flow_path entity corresponding to the target RemoteGroupPort when a SEND/RECEIVE event are received. + * Because such entity can not be created in advance while analyzing flow statically, + * as ReportingTask can not determine whether a component id is a RemoteGroupPort, + * since connectionStatus is the only available information in ReportingContext. + * ConnectionStatus only knows component id, component type is unknown. + * For example, there is no difference to tell if a connected component is a funnel or a RemoteGroupPort. + */ +private void processRemotePortEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) { + +final boolean isRemoteInputPort = "Remote Input Port".equals(event.getComponentType()); + +// Create a RemoteInputPort Process. +// event.getComponentId returns UUID for RemoteGroupPort as a client of S2S, and it's different from a remote port UUID (portDataSetid). +// See NIFI-4571 for detail. +final Referenceable remotePortDataSet = isRemoteInputPort ? analyzedRefs.getOutputs().iterator().next() : analyzedRefs.getInputs().iterator().next(); +final String portProcessId = event.getComponentId(); + +final NiFiFlowPath remotePortProcess = new NiFiFlowPath(portProcessId); +remotePortProcess.setName(event.getComponentType()); +remotePortProcess.addProcessor(portProcessId); + +// For RemoteInputPort, need to find the previous component connected to this port, +// which passed this particular FlowFile. +// That is only possible by calling lineage API. +if (isRemoteInputPort) { +final P
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292739#comment-16292739 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157238842 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +public class StandardAnalysisContext implements AnalysisContext { + +private final Logger logger = LoggerFactory.getLogger(StandardAnalysisContext.class); +private final NiFiFlow nifiFlow; +private final ClusterResolver clusterResolver; +private final ProvenanceRepository provenanceRepository; + +public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver clusterResolver, + ProvenanceRepository provenanceRepository) { +this.nifiFlow = nifiFlow; +this.clusterResolver = clusterResolver; +this.provenanceRepository = provenanceRepository; +} + +@Override +public List findConnectionTo(String componentId) { +return nifiFlow.getIncomingRelationShips(componentId); +} + +@Override +public List findConnectionFrom(String componentId) { +return nifiFlow.getOutgoingRelationShips(componentId); +} + +@Override +public String getNiFiClusterName() { +return nifiFlow.getClusterName(); +} + +@Override +public ClusterResolver getClusterResolver() { +return clusterResolver; +} + +private ComputeLineageResult getLineageResult(long eventId, ComputeLineageSubmission submission) { +final ComputeLineageResult result = submission.getResult(); +try { +if (result.awaitCompletion(10, TimeUnit.SECONDS)) { +return result; +} +logger.warn("Lineage query for {} timed out.", new Object[]{eventId}); +} catch (InterruptedException e) { +logger.warn("Lineage query for {} was interrupted due to {}.", new Object[]{eventId, e}, e); +} finally { +submission.cancel(); +} + +return null; +} + +@Override +public ComputeLineageResult queryLineage(long eventId) { +final ComputeLineageSubmission submission = provenanceRepository.submitLineageComputation(eventId, NIFI_USER); +return getLineageResult(eventId, submission); +} + +public ComputeLineageResult findParents(long eventId) { +final ComputeLineageSubmission submission = provenanceRepository.submitExpandParents(eventId, NIFI_USER); +return getLineageResult(eventId, submission); +} + +// NOTE: This user is required to avoid NullPointerException at PersistentProvenanceRepository.submitLineageComputation +private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser(); --- End diff --
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292728#comment-16292728 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157237190 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowPath; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.LineageEdge; +import org.apache.nifi.provenance.lineage.LineageNode; +import org.apache.nifi.provenance.lineage.LineageNodeType; + +import java.util.List; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class SimpleFlowPathLineage extends AbstractLineageStrategy { + +@Override +public void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event) { +final DataSetRefs refs = executeAnalyzer(analysisContext, event); +if (refs == null || (refs.isEmpty())) { +return; +} + +if ("Remote Input Port".equals(event.getComponentType()) || "Remote Output Port".equals(event.getComponentType())) { +processRemotePortEvent(analysisContext, nifiFlow, event, refs); +} else { +addDataSetRefs(nifiFlow, refs); +} + +} + +/** + * Create a flow_path entity corresponding to the target RemoteGroupPort when a SEND/RECEIVE event are received. + * Because such entity can not be created in advance while analyzing flow statically, + * as ReportingTask can not determine whether a component id is a RemoteGroupPort, + * since connectionStatus is the only available information in ReportingContext. + * ConnectionStatus only knows component id, component type is unknown. + * For example, there is no difference to tell if a connected component is a funnel or a RemoteGroupPort. + */ +private void processRemotePortEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) { + +final boolean isRemoteInputPort = "Remote Input Port".equals(event.getComponentType()); + +// Create a RemoteInputPort Process. +// event.getComponentId returns UUID for RemoteGroupPort as a client of S2S, and it's different from a remote port UUID (portDataSetid). +// See NIFI-4571 for detail. +final Referenceable remotePortDataSet = isRemoteInputPort ? analyzedRefs.getOutputs().iterator().next() : analyzedRefs.getInputs().iterator().next(); +final String portProcessId = event.getComponentId(); + +final NiFiFlowPath remotePortProcess = new NiFiFlowPath(portProcessId); +remotePortProcess.setName(event.getComponentType()); +remotePortProcess.addProcessor(portProcessId); + +// For RemoteInputPort, need to find the previous component connected to this port, +// which passed this particular FlowFile. +// That is only possible by calling lineage API. +if (isRemoteInputPort) { +final Prove
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292724#comment-16292724 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157236574 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +public class StandardAnalysisContext implements AnalysisContext { + +private final Logger logger = LoggerFactory.getLogger(StandardAnalysisContext.class); +private final NiFiFlow nifiFlow; +private final ClusterResolver clusterResolver; +private final ProvenanceRepository provenanceRepository; + +public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver clusterResolver, + ProvenanceRepository provenanceRepository) { +this.nifiFlow = nifiFlow; +this.clusterResolver = clusterResolver; +this.provenanceRepository = provenanceRepository; +} + +@Override +public List findConnectionTo(String componentId) { +return nifiFlow.getIncomingRelationShips(componentId); +} + +@Override +public List findConnectionFrom(String componentId) { +return nifiFlow.getOutgoingRelationShips(componentId); +} + +@Override +public String getNiFiClusterName() { +return nifiFlow.getClusterName(); +} + +@Override +public ClusterResolver getClusterResolver() { +return clusterResolver; +} + +private ComputeLineageResult getLineageResult(long eventId, ComputeLineageSubmission submission) { +final ComputeLineageResult result = submission.getResult(); +try { +if (result.awaitCompletion(10, TimeUnit.SECONDS)) { +return result; +} +logger.warn("Lineage query for {} timed out.", new Object[]{eventId}); +} catch (InterruptedException e) { +logger.warn("Lineage query for {} was interrupted due to {}.", new Object[]{eventId, e}, e); +} finally { +submission.cancel(); +} + +return null; +} + +@Override +public ComputeLineageResult queryLineage(long eventId) { +final ComputeLineageSubmission submission = provenanceRepository.submitLineageComputation(eventId, NIFI_USER); +return getLineageResult(eventId, submission); +} + +public ComputeLineageResult findParents(long eventId) { +final ComputeLineageSubmission submission = provenanceRepository.submitExpandParents(eventId, NIFI_USER); +return getLineageResult(eventId, submission); +} + +// NOTE: This user is required to avoid NullPointerException at PersistentProvenanceRepository.submitLineageComputation +private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser(); --- End diff --
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292712#comment-16292712 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157234799 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.AtlasNiFiFlowLineage/additionalDetails.html --- @@ -0,0 +1,541 @@ + + + + + +AtlasNiFiFlowLineage + + + + +AtlasNiFiFlowLineage + +Table of contents: + + +Information reported to Atlas +NiFi Atlas Types +Cluster Name Resolution +NiFi flow structure + +Path Separation Logic + + +NiFi data lineage + +NiFi Lineage Strategy +NiFi Provenance Event Analysis +Supported DataSets and Processors + + +How it runs in NiFi cluster +Limitations +Atlas Server Configurations +Atlas Server Emulator + + +Information reported to Atlas +This reporting task stores two types of NiFi flow information, 'NiFi flow structure' and 'NiFi data lineage'. + +'NiFi flow structure' tells what components are running within a NiFi flow and how these are connected. It is reported by analyzing current NiFi flow structure, specifically NiFi component relationships. + +'NiFi data lineage' tells what part of NiFi flow interacts with different DataSets such as HDFS files or Hive tables ... etc. It is reported by analyzing NiFi provenance events. + + + +Technically each information is sent using different protocol, Atlas REST API v2, and Notification via a Kafka topic as shown in above image. + + +As both information types use the same NiFi Atlas Types and Cluster Name Resolution concepts, it is recommended to start reading those sections first. + +NiFi Atlas Types + +This reporting task creates following NiFi specific types in Atlas Type system when it runs if these type definitions are not found. + +Green boxes represent sub-types of DataSet and blue ones are sub-types of Process. Gray lines represent entity ownership. +Red lines represent lineage. + + + + +nifi_flow +Represents a NiFI data flow. +As shown in the above diagram, nifi_flow owns other nifi_component types. +This owning relationship is defined by Atlas 'owned' constraint so that when a 'nifi_flow' entity is removed, all owned NiFi component entities are removed in cascading manner. +When this reporting task runs, it analyzes and traverse the entire flow structure, and create NiFi component entities in Atlas. +At later runs, it compares the current flow structure with the one stored in Atlas to figure out if any changes has been made since the last time the flow was reported. The reporting task updates NiFi component entities in Atlas if needed. +NiFi components those are removed from a NiFi flow also get deleted from Atlas. +However those entities can still be seen in Atlas search results or lineage graphs since Atlas uses 'Soft Delete' by default. +See Atlas Delete Handler for further detail. + +Attributes: + +qualifiedName: Root ProcessGroup ID@clusterName (e.g. 86420a14-2fab-3e1e-4331-fb6ab42f58e0@cl1) +name: Name of the Root ProcessGroup. +url: URL of the NiFi instance. This can be specified via reporting task 'NiFi URL for Atlas' property. + + + +nifi_flow_path Part of a NiFi data flow containing one or more processing NiFi components such as Processors and RemoteGroupPorts. The reporting task divides a NiFi flow into multiple flow paths. See Path Separation Logic for details. +Attributes: + +qualifiedName: The first NiFi component Id in a path@clusterName (e.g. 529e6722-9b49-3b66-9c94-00da9863ca2d@cl1) +name: NiFi component namess within a path are concatenated (e.g. GenerateFlowFile, PutFile, LogAttribute) +
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292695#comment-16292695 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157232014 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.provenance.ProvenanceEventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; + +public class NiFiProvenanceEventAnalyzerFactory { + +private static final Logger logger = LoggerFactory.getLogger(NiFiProvenanceEventAnalyzerFactory.class); +private static final Map analyzersForComponentType = new ConcurrentHashMap<>(); +private static final Map analyzersForTransitUri = new ConcurrentHashMap<>(); +private static final Map analyzersForProvenanceEventType = new ConcurrentHashMap<>(); +private static boolean loaded = false; + +private static void loadAnalyzers() { +logger.debug("Loading NiFiProvenanceEventAnalyzer ..."); +final ServiceLoader serviceLoader += ServiceLoader.load(NiFiProvenanceEventAnalyzer.class); +serviceLoader.forEach(analyzer -> { +addAnalyzer(analyzer.targetComponentTypePattern(), analyzersForComponentType, analyzer); +addAnalyzer(analyzer.targetTransitUriPattern(), analyzersForTransitUri, analyzer); +final ProvenanceEventType eventType = analyzer.targetProvenanceEventType(); +if (eventType != null) { +if (analyzersForProvenanceEventType.containsKey(eventType)) { +logger.warn("Fo ProvenanceEventType {}, an Analyzer {} is already assigned." + +" Only one analyzer for a type can be registered. Ignoring {}", +eventType, analyzersForProvenanceEventType.get(eventType), analyzer); +} +analyzersForProvenanceEventType.put(eventType, analyzer); +} +}); +logger.info("Loaded NiFiProvenanceEventAnalyzers: componentTypes={}, transitUris={}", analyzersForComponentType, analyzersForTransitUri); +} + +private static void addAnalyzer(String patternStr, Map toAdd, +NiFiProvenanceEventAnalyzer analyzer) { +if (patternStr != null && !patternStr.isEmpty()) { +Pattern pattern = Pattern.compile(patternStr.trim()); +toAdd.put(pattern, analyzer); +} +} + +/** + * Find and retrieve NiFiProvenanceEventAnalyzer implementation for the specified targets. + * Pattern matching is performed by following order, and the one found at first is returned: + * + * Component type name. Use an analyzer supporting the Component type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}. + * TransitUri. Use an analyzer supporting the TransitUri with its {@link NiFiProvenanceEventAnalyzer#targetTransitUriPattern()}. + * Provenance Event Type. Use an analyzer supporting the Provenance Event Type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}. + * + * @param typeName NiFi component type name. + * @param transitUri Transit URI. + * @param eventType Provenance event type. + * @return Instance of NiFiProvenanceEventAnalyzer if one is found for the specified className, otherwise null. + */ +
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292694#comment-16292694 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157231905 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.provenance.ProvenanceEventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; + +public class NiFiProvenanceEventAnalyzerFactory { + +private static final Logger logger = LoggerFactory.getLogger(NiFiProvenanceEventAnalyzerFactory.class); +private static final Map analyzersForComponentType = new ConcurrentHashMap<>(); +private static final Map analyzersForTransitUri = new ConcurrentHashMap<>(); +private static final Map analyzersForProvenanceEventType = new ConcurrentHashMap<>(); +private static boolean loaded = false; + +private static void loadAnalyzers() { +logger.debug("Loading NiFiProvenanceEventAnalyzer ..."); +final ServiceLoader serviceLoader += ServiceLoader.load(NiFiProvenanceEventAnalyzer.class); +serviceLoader.forEach(analyzer -> { +addAnalyzer(analyzer.targetComponentTypePattern(), analyzersForComponentType, analyzer); +addAnalyzer(analyzer.targetTransitUriPattern(), analyzersForTransitUri, analyzer); +final ProvenanceEventType eventType = analyzer.targetProvenanceEventType(); +if (eventType != null) { +if (analyzersForProvenanceEventType.containsKey(eventType)) { +logger.warn("Fo ProvenanceEventType {}, an Analyzer {} is already assigned." + +" Only one analyzer for a type can be registered. Ignoring {}", +eventType, analyzersForProvenanceEventType.get(eventType), analyzer); +} +analyzersForProvenanceEventType.put(eventType, analyzer); +} +}); +logger.info("Loaded NiFiProvenanceEventAnalyzers: componentTypes={}, transitUris={}", analyzersForComponentType, analyzersForTransitUri); +} + +private static void addAnalyzer(String patternStr, Map toAdd, +NiFiProvenanceEventAnalyzer analyzer) { +if (patternStr != null && !patternStr.isEmpty()) { +Pattern pattern = Pattern.compile(patternStr.trim()); +toAdd.put(pattern, analyzer); +} +} + +/** + * Find and retrieve NiFiProvenanceEventAnalyzer implementation for the specified targets. + * Pattern matching is performed by following order, and the one found at first is returned: + * + * Component type name. Use an analyzer supporting the Component type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}. + * TransitUri. Use an analyzer supporting the TransitUri with its {@link NiFiProvenanceEventAnalyzer#targetTransitUriPattern()}. + * Provenance Event Type. Use an analyzer supporting the Provenance Event Type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}. + * + * @param typeName NiFi component type name. + * @param transitUri Transit URI. + * @param eventType Provenance event type. + * @return Instance of NiFiProvenanceEventAnalyzer if one is found for the specified className, otherwise null. + */ +
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292663#comment-16292663 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157223751 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowPath; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.LineageEdge; +import org.apache.nifi.provenance.lineage.LineageNode; +import org.apache.nifi.provenance.lineage.LineageNodeType; + +import java.util.List; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class SimpleFlowPathLineage extends AbstractLineageStrategy { + +@Override +public void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event) { +final DataSetRefs refs = executeAnalyzer(analysisContext, event); +if (refs == null || (refs.isEmpty())) { +return; +} + +if ("Remote Input Port".equals(event.getComponentType()) || "Remote Output Port".equals(event.getComponentType())) { +processRemotePortEvent(analysisContext, nifiFlow, event, refs); +} else { +addDataSetRefs(nifiFlow, refs); +} + +} + +/** + * Create a flow_path entity corresponding to the target RemoteGroupPort when a SEND/RECEIVE event are received. + * Because such entity can not be created in advance while analyzing flow statically, + * as ReportingTask can not determine whether a component id is a RemoteGroupPort, + * since connectionStatus is the only available information in ReportingContext. + * ConnectionStatus only knows component id, component type is unknown. + * For example, there is no difference to tell if a connected component is a funnel or a RemoteGroupPort. + */ +private void processRemotePortEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) { + +final boolean isRemoteInputPort = "Remote Input Port".equals(event.getComponentType()); + +// Create a RemoteInputPort Process. +// event.getComponentId returns UUID for RemoteGroupPort as a client of S2S, and it's different from a remote port UUID (portDataSetid). +// See NIFI-4571 for detail. +final Referenceable remotePortDataSet = isRemoteInputPort ? analyzedRefs.getOutputs().iterator().next() : analyzedRefs.getInputs().iterator().next(); +final String portProcessId = event.getComponentId(); + +final NiFiFlowPath remotePortProcess = new NiFiFlowPath(portProcessId); +remotePortProcess.setName(event.getComponentType()); +remotePortProcess.addProcessor(portProcessId); + +// For RemoteInputPort, need to find the previous component connected to this port, +// which passed this particular FlowFile. +// That is only possible by calling lineage API. +if (isRemoteInputPort) { +final P
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292661#comment-16292661 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157222944 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java --- @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MultivaluedMap; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.toStr; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL; +import static org.apache.nifi.atlas.NiFiTypes.ENTITIES; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class NiFiAtlasClient { + +private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); + +private static NiFiAtlasClient nifiClient; +private AtlasClientV2 atlasClient; + +private NiFiAtlasClient() { +super(); +}
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292658#comment-16292658 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157222062 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.hook.AtlasHook; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.nifi.atlas.provenance.lineage.LineageContext; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE; +import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; + +/** + * This class is not thread-safe as it holds uncommitted notification messages within instance. + * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread. + */ +public class NiFIAtlasHook extends AtlasHook implements LineageContext { + +public static final String NIFI_USER = "nifi"; + +private static final Logger logger = LoggerFactory.getLogger(NiFIAtlasHook.class); +private static final String CONF_PREFIX = "atlas.hook.nifi."; +private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; + +private final NiFiAtlasClient atlasClient; + +/** + * An index to resolve a qualifiedName from a GUID. + */ +private final Map guidToQualifiedName; +/** + * An index to resolve a Referenceable from a typeName::qualifiedName. + */ +private final Map typedQualifiedNameToRef; + + +private static Map createCache(final int maxSize) { +return new LinkedHashMap(maxSize, 0.75f, true) { --- End diff -- Well, it's just a private method to create cache instances within this class. So if different default optimizations are needed, then we can do that here as well without affecting others. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those inter
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292657#comment-16292657 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157221674 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.hook.AtlasHook; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.nifi.atlas.provenance.lineage.LineageContext; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE; +import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; + +/** + * This class is not thread-safe as it holds uncommitted notification messages within instance. + * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread. + */ +public class NiFIAtlasHook extends AtlasHook implements LineageContext { + +public static final String NIFI_USER = "nifi"; --- End diff -- Yes, it would. However, at this moment, this user is not significant. Please see this conversation on the same subject. https://github.com/apache/nifi/pull/2335#discussion_r156992235 > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact w
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292654#comment-16292654 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157221249 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.hook.AtlasHook; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.nifi.atlas.provenance.lineage.LineageContext; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE; +import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; + +/** + * This class is not thread-safe as it holds uncommitted notification messages within instance. + * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread. + */ +public class NiFIAtlasHook extends AtlasHook implements LineageContext { --- End diff -- Good catch, thank you! I'll fix this. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the sam
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292652#comment-16292652 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157221167 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/AtlasUtils.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import org.apache.atlas.model.instance.AtlasObjectId; + +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; + +public class AtlasUtils { + +public static String toStr(Object obj) { +return obj != null ? obj.toString() : null; +} + + +public static boolean isGuidAssigned(String guid) { +return guid != null && !guid.startsWith("-"); --- End diff -- Yes, it's Atlas internal implementation. However, checking null is not sufficient, because entities created at client side but not yet registered in Atlas have negative GUIDs. I'd like to keep it this way if there's no strong objections. Thank you. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292649#comment-16292649 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157220655 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java --- @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MultivaluedMap; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.toStr; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL; +import static org.apache.nifi.atlas.NiFiTypes.ENTITIES; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class NiFiAtlasClient { + +private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); + +private static NiFiAtlasClient nifiClient; +private AtlasClientV2 atlasClient; + +private NiFiAtlasClient() { +super(); +}
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292648#comment-16292648 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157219892 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +public class StandardAnalysisContext implements AnalysisContext { + +private final Logger logger = LoggerFactory.getLogger(StandardAnalysisContext.class); +private final NiFiFlow nifiFlow; +private final ClusterResolver clusterResolver; +private final ProvenanceRepository provenanceRepository; + +public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver clusterResolver, + ProvenanceRepository provenanceRepository) { +this.nifiFlow = nifiFlow; +this.clusterResolver = clusterResolver; +this.provenanceRepository = provenanceRepository; +} + +@Override +public List findConnectionTo(String componentId) { +return nifiFlow.getIncomingRelationShips(componentId); +} + +@Override +public List findConnectionFrom(String componentId) { +return nifiFlow.getOutgoingRelationShips(componentId); +} + +@Override +public String getNiFiClusterName() { +return nifiFlow.getClusterName(); +} + +@Override +public ClusterResolver getClusterResolver() { +return clusterResolver; +} + +private ComputeLineageResult getLineageResult(long eventId, ComputeLineageSubmission submission) { +final ComputeLineageResult result = submission.getResult(); +try { +if (result.awaitCompletion(10, TimeUnit.SECONDS)) { --- End diff -- I agree on that this can be a costly operation. The reason to query provenance is to compute a lineage from a DROP provenance event. This is used by 'Complete Path' strategy. I wrote documentation on performance impact. If this does not work for a use-case, then user can choose another strategy, 'Simple Path'. Simple Path does not query provenance events this way. It analyzes each individual event, so should be more lightweight. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of datase
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292640#comment-16292640 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157217791 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java --- @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.reporting; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.atlas.NiFIAtlasHook; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.StandardAnalysisContext; +import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage; +import org.apache.nifi.atlas.provenance.lineage.LineageStrategy; +import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.RegexClusterResolver; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.atlas.security.Basic; +import org.apache.nifi.atlas.security.Kerberos; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static org.apache.commons.lang
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292639#comment-16292639 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157217462 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java --- @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.reporting; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.atlas.NiFIAtlasHook; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.StandardAnalysisContext; +import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage; +import org.apache.nifi.atlas.provenance.lineage.LineageStrategy; +import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.RegexClusterResolver; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.atlas.security.Basic; +import org.apache.nifi.atlas.security.Kerberos; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static org.apache.commons.lang
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292626#comment-16292626 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157216281 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java --- @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.reporting; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.atlas.NiFIAtlasHook; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.StandardAnalysisContext; +import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage; +import org.apache.nifi.atlas.provenance.lineage.LineageStrategy; +import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.RegexClusterResolver; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.atlas.security.Basic; +import org.apache.nifi.atlas.security.Kerberos; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static org.apache.commons.lang
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292625#comment-16292625 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157216102 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java --- @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.reporting; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.atlas.NiFIAtlasHook; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.StandardAnalysisContext; +import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage; +import org.apache.nifi.atlas.provenance.lineage.LineageStrategy; +import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.RegexClusterResolver; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.atlas.security.Basic; +import org.apache.nifi.atlas.security.Kerberos; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static org.apache.commons.lang
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292614#comment-16292614 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157214953 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java --- @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.reporting; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.atlas.NiFIAtlasHook; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.StandardAnalysisContext; +import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage; +import org.apache.nifi.atlas.provenance.lineage.LineageStrategy; +import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.RegexClusterResolver; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.atlas.security.Basic; +import org.apache.nifi.atlas.security.Kerberos; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static org.apache.commons.lang
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292612#comment-16292612 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157214860 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowPath; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.LineageEdge; +import org.apache.nifi.provenance.lineage.LineageNode; +import org.apache.nifi.provenance.lineage.LineageNodeType; + +import java.util.List; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class SimpleFlowPathLineage extends AbstractLineageStrategy { + +@Override +public void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event) { +final DataSetRefs refs = executeAnalyzer(analysisContext, event); +if (refs == null || (refs.isEmpty())) { +return; +} + +if ("Remote Input Port".equals(event.getComponentType()) || "Remote Output Port".equals(event.getComponentType())) { +processRemotePortEvent(analysisContext, nifiFlow, event, refs); +} else { +addDataSetRefs(nifiFlow, refs); +} + +} + +/** + * Create a flow_path entity corresponding to the target RemoteGroupPort when a SEND/RECEIVE event are received. + * Because such entity can not be created in advance while analyzing flow statically, + * as ReportingTask can not determine whether a component id is a RemoteGroupPort, + * since connectionStatus is the only available information in ReportingContext. + * ConnectionStatus only knows component id, component type is unknown. + * For example, there is no difference to tell if a connected component is a funnel or a RemoteGroupPort. + */ +private void processRemotePortEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) { + +final boolean isRemoteInputPort = "Remote Input Port".equals(event.getComponentType()); + +// Create a RemoteInputPort Process. +// event.getComponentId returns UUID for RemoteGroupPort as a client of S2S, and it's different from a remote port UUID (portDataSetid). +// See NIFI-4571 for detail. +final Referenceable remotePortDataSet = isRemoteInputPort ? analyzedRefs.getOutputs().iterator().next() : analyzedRefs.getInputs().iterator().next(); +final String portProcessId = event.getComponentId(); + +final NiFiFlowPath remotePortProcess = new NiFiFlowPath(portProcessId); +remotePortProcess.setName(event.getComponentType()); +remotePortProcess.addProcessor(portProcessId); + +// For RemoteInputPort, need to find the previous component connected to this port, +// which passed this particular FlowFile. +// That is only possible by calling lineage API. +if (isRemoteInputPort) { +final P
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292582#comment-16292582 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157208859 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java --- @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MultivaluedMap; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.toStr; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL; +import static org.apache.nifi.atlas.NiFiTypes.ENTITIES; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class NiFiAtlasClient { + +private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); + +private static NiFiAtlasClient nifiClient; +private AtlasClientV2 atlasClient; + +private NiFiAtlasClient() { +super(); +}
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292577#comment-16292577 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157208324 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java --- @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MultivaluedMap; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.toStr; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL; +import static org.apache.nifi.atlas.NiFiTypes.ENTITIES; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class NiFiAtlasClient { + +private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); + +private static NiFiAtlasClient nifiClient; +private AtlasClientV2 atlasClient; + +private NiFiAtlasClient() { +super(); +}
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292576#comment-16292576 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157208066 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +public class StandardAnalysisContext implements AnalysisContext { + +private final Logger logger = LoggerFactory.getLogger(StandardAnalysisContext.class); +private final NiFiFlow nifiFlow; +private final ClusterResolver clusterResolver; +private final ProvenanceRepository provenanceRepository; + +public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver clusterResolver, + ProvenanceRepository provenanceRepository) { +this.nifiFlow = nifiFlow; +this.clusterResolver = clusterResolver; +this.provenanceRepository = provenanceRepository; +} + +@Override +public List findConnectionTo(String componentId) { +return nifiFlow.getIncomingRelationShips(componentId); +} + +@Override +public List findConnectionFrom(String componentId) { +return nifiFlow.getOutgoingRelationShips(componentId); +} + +@Override +public String getNiFiClusterName() { +return nifiFlow.getClusterName(); +} + +@Override +public ClusterResolver getClusterResolver() { +return clusterResolver; +} + +private ComputeLineageResult getLineageResult(long eventId, ComputeLineageSubmission submission) { +final ComputeLineageResult result = submission.getResult(); +try { +if (result.awaitCompletion(10, TimeUnit.SECONDS)) { +return result; +} +logger.warn("Lineage query for {} timed out.", new Object[]{eventId}); +} catch (InterruptedException e) { +logger.warn("Lineage query for {} was interrupted due to {}.", new Object[]{eventId, e}, e); +} finally { +submission.cancel(); +} + +return null; +} + +@Override +public ComputeLineageResult queryLineage(long eventId) { +final ComputeLineageSubmission submission = provenanceRepository.submitLineageComputation(eventId, NIFI_USER); +return getLineageResult(eventId, submission); +} + +public ComputeLineageResult findParents(long eventId) { +final ComputeLineageSubmission submission = provenanceRepository.submitExpandParents(eventId, NIFI_USER); +return getLineageResult(eventId, submission); +} + +// NOTE: This user is required to avoid NullPointerException at PersistentProvenanceRepository.submitLineageComputation +private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser(); --- End diff --
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291502#comment-16291502 ] ASF GitHub Bot commented on NIFI-3709: -- Github user dmkoster commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157042460 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java --- @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MultivaluedMap; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.toStr; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL; +import static org.apache.nifi.atlas.NiFiTypes.ENTITIES; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class NiFiAtlasClient { + +private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); + +private static NiFiAtlasClient nifiClient; +private AtlasClientV2 atlasClient; + +private NiFiAtlasClient() { +super(); +}
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291498#comment-16291498 ] ASF GitHub Bot commented on NIFI-3709: -- Github user dmkoster commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157035283 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.hook.AtlasHook; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.nifi.atlas.provenance.lineage.LineageContext; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE; +import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; + +/** + * This class is not thread-safe as it holds uncommitted notification messages within instance. + * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread. + */ +public class NiFIAtlasHook extends AtlasHook implements LineageContext { + +public static final String NIFI_USER = "nifi"; + +private static final Logger logger = LoggerFactory.getLogger(NiFIAtlasHook.class); +private static final String CONF_PREFIX = "atlas.hook.nifi."; +private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; + +private final NiFiAtlasClient atlasClient; + +/** + * An index to resolve a qualifiedName from a GUID. + */ +private final Map guidToQualifiedName; +/** + * An index to resolve a Referenceable from a typeName::qualifiedName. + */ +private final Map typedQualifiedNameToRef; + + +private static Map createCache(final int maxSize) { +return new LinkedHashMap(maxSize, 0.75f, true) { --- End diff -- The default load factor is currently 0.75. Is there a reason for fixing the value versus allowing future default optimizations? > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who u
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291497#comment-16291497 ] ASF GitHub Bot commented on NIFI-3709: -- Github user dmkoster commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157034065 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.hook.AtlasHook; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.nifi.atlas.provenance.lineage.LineageContext; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE; +import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; + +/** + * This class is not thread-safe as it holds uncommitted notification messages within instance. + * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread. + */ +public class NiFIAtlasHook extends AtlasHook implements LineageContext { + +public static final String NIFI_USER = "nifi"; --- End diff -- Would this be best offered as a configurable value? > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we alread
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291500#comment-16291500 ] ASF GitHub Bot commented on NIFI-3709: -- Github user dmkoster commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157047074 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowPath; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.LineageEdge; +import org.apache.nifi.provenance.lineage.LineageNode; +import org.apache.nifi.provenance.lineage.LineageNodeType; + +import java.util.List; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class SimpleFlowPathLineage extends AbstractLineageStrategy { + +@Override +public void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event) { +final DataSetRefs refs = executeAnalyzer(analysisContext, event); +if (refs == null || (refs.isEmpty())) { +return; +} + +if ("Remote Input Port".equals(event.getComponentType()) || "Remote Output Port".equals(event.getComponentType())) { +processRemotePortEvent(analysisContext, nifiFlow, event, refs); +} else { +addDataSetRefs(nifiFlow, refs); +} + +} + +/** + * Create a flow_path entity corresponding to the target RemoteGroupPort when a SEND/RECEIVE event are received. + * Because such entity can not be created in advance while analyzing flow statically, + * as ReportingTask can not determine whether a component id is a RemoteGroupPort, + * since connectionStatus is the only available information in ReportingContext. + * ConnectionStatus only knows component id, component type is unknown. + * For example, there is no difference to tell if a connected component is a funnel or a RemoteGroupPort. + */ +private void processRemotePortEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) { + +final boolean isRemoteInputPort = "Remote Input Port".equals(event.getComponentType()); + +// Create a RemoteInputPort Process. +// event.getComponentId returns UUID for RemoteGroupPort as a client of S2S, and it's different from a remote port UUID (portDataSetid). +// See NIFI-4571 for detail. +final Referenceable remotePortDataSet = isRemoteInputPort ? analyzedRefs.getOutputs().iterator().next() : analyzedRefs.getInputs().iterator().next(); +final String portProcessId = event.getComponentId(); + +final NiFiFlowPath remotePortProcess = new NiFiFlowPath(portProcessId); +remotePortProcess.setName(event.getComponentType()); +remotePortProcess.addProcessor(portProcessId); + +// For RemoteInputPort, need to find the previous component connected to this port, +// which passed this particular FlowFile. +// That is only possible by calling lineage API. +if (isRemoteInputPort) { +final Prove
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291499#comment-16291499 ] ASF GitHub Bot commented on NIFI-3709: -- Github user dmkoster commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157029294 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.hook.AtlasHook; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.nifi.atlas.provenance.lineage.LineageContext; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE; +import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; + +/** + * This class is not thread-safe as it holds uncommitted notification messages within instance. + * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread. + */ +public class NiFIAtlasHook extends AtlasHook implements LineageContext { --- End diff -- The class name follows a different capitalization pattern than the others. Perhaps a typo? NiFIAtlasHook vs NifiAtlasHook. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. A
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291501#comment-16291501 ] ASF GitHub Bot commented on NIFI-3709: -- Github user dmkoster commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157031810 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/AtlasUtils.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import org.apache.atlas.model.instance.AtlasObjectId; + +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; + +public class AtlasUtils { + +public static String toStr(Object obj) { +return obj != null ? obj.toString() : null; +} + + +public static boolean isGuidAssigned(String guid) { +return guid != null && !guid.startsWith("-"); --- End diff -- The guid starting with a dash is an internal detail of the AtlasEntity class and might change. Checking for null should be sufficient > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291346#comment-16291346 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/2335 @ijokarumawak thanks for all of the work that you've put into this - it is very much a non-trivial effort! For the most part, the code looks good. I flagged a couple of minor things in the code, 1 or 2 thread-safety issues that should be easy to address. The only 'more significant' concern that I have is the use of the dummied-up NiFiUser. As-is, this is an anonymous user and in a secured environment will not retrieve the event details that are necessary. It also means that we would be validating events against a user who doesn't even exist. I think there are 2 ways to approach this: first, as I noted inline, we could have a property to define which user the queries should run on behalf of. So the user could add a "NiFi Atlas" user and use that. However, that's also a bit concerning because it means that whoever has access to edit the reporting task can run provenance queries on behalf of another user. By far, my preference is to actually just update the ProvenanceRepository implementations (There are 4 now, I think) so that if a null User is passed in, we don't check permissions. This would mean that you can pass in null from Reporting Task. We could also then update the interface to have an overloaded method that does not require that a user be given. Once that is addressed, I think it is a +1 from me from a code review perspective. Thanks -Mark > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291319#comment-16291319 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157025778 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.AtlasNiFiFlowLineage/additionalDetails.html --- @@ -0,0 +1,541 @@ + + + + + +AtlasNiFiFlowLineage + + + + +AtlasNiFiFlowLineage + +Table of contents: + + +Information reported to Atlas +NiFi Atlas Types +Cluster Name Resolution +NiFi flow structure + +Path Separation Logic + + +NiFi data lineage + +NiFi Lineage Strategy +NiFi Provenance Event Analysis +Supported DataSets and Processors + + +How it runs in NiFi cluster +Limitations +Atlas Server Configurations +Atlas Server Emulator + + +Information reported to Atlas +This reporting task stores two types of NiFi flow information, 'NiFi flow structure' and 'NiFi data lineage'. + +'NiFi flow structure' tells what components are running within a NiFi flow and how these are connected. It is reported by analyzing current NiFi flow structure, specifically NiFi component relationships. + +'NiFi data lineage' tells what part of NiFi flow interacts with different DataSets such as HDFS files or Hive tables ... etc. It is reported by analyzing NiFi provenance events. + + + +Technically each information is sent using different protocol, Atlas REST API v2, and Notification via a Kafka topic as shown in above image. + + +As both information types use the same NiFi Atlas Types and Cluster Name Resolution concepts, it is recommended to start reading those sections first. + +NiFi Atlas Types + +This reporting task creates following NiFi specific types in Atlas Type system when it runs if these type definitions are not found. + +Green boxes represent sub-types of DataSet and blue ones are sub-types of Process. Gray lines represent entity ownership. +Red lines represent lineage. + + + + +nifi_flow +Represents a NiFI data flow. +As shown in the above diagram, nifi_flow owns other nifi_component types. +This owning relationship is defined by Atlas 'owned' constraint so that when a 'nifi_flow' entity is removed, all owned NiFi component entities are removed in cascading manner. +When this reporting task runs, it analyzes and traverse the entire flow structure, and create NiFi component entities in Atlas. +At later runs, it compares the current flow structure with the one stored in Atlas to figure out if any changes has been made since the last time the flow was reported. The reporting task updates NiFi component entities in Atlas if needed. +NiFi components those are removed from a NiFi flow also get deleted from Atlas. +However those entities can still be seen in Atlas search results or lineage graphs since Atlas uses 'Soft Delete' by default. +See Atlas Delete Handler for further detail. + +Attributes: + +qualifiedName: Root ProcessGroup ID@clusterName (e.g. 86420a14-2fab-3e1e-4331-fb6ab42f58e0@cl1) +name: Name of the Root ProcessGroup. +url: URL of the NiFi instance. This can be specified via reporting task 'NiFi URL for Atlas' property. + + + +nifi_flow_path Part of a NiFi data flow containing one or more processing NiFi components such as Processors and RemoteGroupPorts. The reporting task divides a NiFi flow into multiple flow paths. See Path Separation Logic for details. +Attributes: + +qualifiedName: The first NiFi component Id in a path@clusterName (e.g. 529e6722-9b49-3b66-9c94-00da9863ca2d@cl1) +name: NiFi component namess within a path are concatenated (e.g. GenerateFlowFile, PutFile, LogAttribute) +u
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291212#comment-16291212 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157009606 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java --- @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.reporting; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.atlas.NiFIAtlasHook; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.StandardAnalysisContext; +import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage; +import org.apache.nifi.atlas.provenance.lineage.LineageStrategy; +import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.RegexClusterResolver; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.atlas.security.Basic; +import org.apache.nifi.atlas.security.Kerberos; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static org.apache.commons.lang3.St
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291193#comment-16291193 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157005913 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java --- @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.reporting; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.atlas.NiFIAtlasHook; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.StandardAnalysisContext; +import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage; +import org.apache.nifi.atlas.provenance.lineage.LineageStrategy; +import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.RegexClusterResolver; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.atlas.security.Basic; +import org.apache.nifi.atlas.security.Kerberos; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static org.apache.commons.lang3.St
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291192#comment-16291192 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157005524 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java --- @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.reporting; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.atlas.NiFIAtlasHook; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.StandardAnalysisContext; +import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage; +import org.apache.nifi.atlas.provenance.lineage.LineageStrategy; +import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.RegexClusterResolver; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.atlas.security.Basic; +import org.apache.nifi.atlas.security.Kerberos; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static org.apache.commons.lang3.St
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291190#comment-16291190 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157005140 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java --- @@ -0,0 +1,714 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.reporting; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasServiceException; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.atlas.NiFIAtlasHook; +import org.apache.nifi.atlas.NiFiAtlasClient; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowAnalyzer; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.StandardAnalysisContext; +import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage; +import org.apache.nifi.atlas.provenance.lineage.LineageStrategy; +import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.atlas.resolver.ClusterResolvers; +import org.apache.nifi.atlas.resolver.RegexClusterResolver; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.atlas.security.Basic; +import org.apache.nifi.atlas.security.Kerberos; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer; +import org.apache.nifi.ssl.SSLContextService; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static org.apache.commons.lang3.St
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291185#comment-16291185 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r157004365 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance.lineage; + +import org.apache.atlas.typesystem.Referenceable; +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.NiFiFlowPath; +import org.apache.nifi.atlas.provenance.AnalysisContext; +import org.apache.nifi.atlas.provenance.DataSetRefs; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.LineageEdge; +import org.apache.nifi.provenance.lineage.LineageNode; +import org.apache.nifi.provenance.lineage.LineageNodeType; + +import java.util.List; + +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class SimpleFlowPathLineage extends AbstractLineageStrategy { + +@Override +public void processEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event) { +final DataSetRefs refs = executeAnalyzer(analysisContext, event); +if (refs == null || (refs.isEmpty())) { +return; +} + +if ("Remote Input Port".equals(event.getComponentType()) || "Remote Output Port".equals(event.getComponentType())) { +processRemotePortEvent(analysisContext, nifiFlow, event, refs); +} else { +addDataSetRefs(nifiFlow, refs); +} + +} + +/** + * Create a flow_path entity corresponding to the target RemoteGroupPort when a SEND/RECEIVE event are received. + * Because such entity can not be created in advance while analyzing flow statically, + * as ReportingTask can not determine whether a component id is a RemoteGroupPort, + * since connectionStatus is the only available information in ReportingContext. + * ConnectionStatus only knows component id, component type is unknown. + * For example, there is no difference to tell if a connected component is a funnel or a RemoteGroupPort. + */ +private void processRemotePortEvent(AnalysisContext analysisContext, NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) { + +final boolean isRemoteInputPort = "Remote Input Port".equals(event.getComponentType()); + +// Create a RemoteInputPort Process. +// event.getComponentId returns UUID for RemoteGroupPort as a client of S2S, and it's different from a remote port UUID (portDataSetid). +// See NIFI-4571 for detail. +final Referenceable remotePortDataSet = isRemoteInputPort ? analyzedRefs.getOutputs().iterator().next() : analyzedRefs.getInputs().iterator().next(); +final String portProcessId = event.getComponentId(); + +final NiFiFlowPath remotePortProcess = new NiFiFlowPath(portProcessId); +remotePortProcess.setName(event.getComponentType()); +remotePortProcess.addProcessor(portProcessId); + +// For RemoteInputPort, need to find the previous component connected to this port, +// which passed this particular FlowFile. +// That is only possible by calling lineage API. +if (isRemoteInputPort) { +final Prove
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291122#comment-16291122 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r156992235 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +public class StandardAnalysisContext implements AnalysisContext { + +private final Logger logger = LoggerFactory.getLogger(StandardAnalysisContext.class); +private final NiFiFlow nifiFlow; +private final ClusterResolver clusterResolver; +private final ProvenanceRepository provenanceRepository; + +public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver clusterResolver, + ProvenanceRepository provenanceRepository) { +this.nifiFlow = nifiFlow; +this.clusterResolver = clusterResolver; +this.provenanceRepository = provenanceRepository; +} + +@Override +public List findConnectionTo(String componentId) { +return nifiFlow.getIncomingRelationShips(componentId); +} + +@Override +public List findConnectionFrom(String componentId) { +return nifiFlow.getOutgoingRelationShips(componentId); +} + +@Override +public String getNiFiClusterName() { +return nifiFlow.getClusterName(); +} + +@Override +public ClusterResolver getClusterResolver() { +return clusterResolver; +} + +private ComputeLineageResult getLineageResult(long eventId, ComputeLineageSubmission submission) { +final ComputeLineageResult result = submission.getResult(); +try { +if (result.awaitCompletion(10, TimeUnit.SECONDS)) { +return result; +} +logger.warn("Lineage query for {} timed out.", new Object[]{eventId}); +} catch (InterruptedException e) { +logger.warn("Lineage query for {} was interrupted due to {}.", new Object[]{eventId, e}, e); +} finally { +submission.cancel(); +} + +return null; +} + +@Override +public ComputeLineageResult queryLineage(long eventId) { +final ComputeLineageSubmission submission = provenanceRepository.submitLineageComputation(eventId, NIFI_USER); +return getLineageResult(eventId, submission); +} + +public ComputeLineageResult findParents(long eventId) { +final ComputeLineageSubmission submission = provenanceRepository.submitExpandParents(eventId, NIFI_USER); +return getLineageResult(eventId, submission); +} + +// NOTE: This user is required to avoid NullPointerException at PersistentProvenanceRepository.submitLineageComputation +private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser(); --- End diff --
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291088#comment-16291088 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r156985994 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.atlas.NiFiFlow; +import org.apache.nifi.atlas.resolver.ClusterResolver; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.provenance.lineage.ComputeLineageResult; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +public class StandardAnalysisContext implements AnalysisContext { + +private final Logger logger = LoggerFactory.getLogger(StandardAnalysisContext.class); +private final NiFiFlow nifiFlow; +private final ClusterResolver clusterResolver; +private final ProvenanceRepository provenanceRepository; + +public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver clusterResolver, + ProvenanceRepository provenanceRepository) { +this.nifiFlow = nifiFlow; +this.clusterResolver = clusterResolver; +this.provenanceRepository = provenanceRepository; +} + +@Override +public List findConnectionTo(String componentId) { +return nifiFlow.getIncomingRelationShips(componentId); +} + +@Override +public List findConnectionFrom(String componentId) { +return nifiFlow.getOutgoingRelationShips(componentId); +} + +@Override +public String getNiFiClusterName() { +return nifiFlow.getClusterName(); +} + +@Override +public ClusterResolver getClusterResolver() { +return clusterResolver; +} + +private ComputeLineageResult getLineageResult(long eventId, ComputeLineageSubmission submission) { +final ComputeLineageResult result = submission.getResult(); +try { +if (result.awaitCompletion(10, TimeUnit.SECONDS)) { --- End diff -- I'm not sure yet the context in which this is used. However, this is a bit of a red flag. In order to compute a FlowFile's lineage, depending on the size of the Provenance Repository and the implementation, it may take many minutes (consider a Provenance Repository that is a couple of terabytes in size and the event is one of the oldest -it will have to search all lucene indices for this). In most cases it is fairly quick, but we should not assume that it will be. What exactly is the implication here if this always times out? > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atl
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291062#comment-16291062 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r156984919 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.provenance; + +import org.apache.nifi.provenance.ProvenanceEventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; + +public class NiFiProvenanceEventAnalyzerFactory { + +private static final Logger logger = LoggerFactory.getLogger(NiFiProvenanceEventAnalyzerFactory.class); +private static final Map analyzersForComponentType = new ConcurrentHashMap<>(); +private static final Map analyzersForTransitUri = new ConcurrentHashMap<>(); +private static final Map analyzersForProvenanceEventType = new ConcurrentHashMap<>(); +private static boolean loaded = false; + +private static void loadAnalyzers() { +logger.debug("Loading NiFiProvenanceEventAnalyzer ..."); +final ServiceLoader serviceLoader += ServiceLoader.load(NiFiProvenanceEventAnalyzer.class); +serviceLoader.forEach(analyzer -> { +addAnalyzer(analyzer.targetComponentTypePattern(), analyzersForComponentType, analyzer); +addAnalyzer(analyzer.targetTransitUriPattern(), analyzersForTransitUri, analyzer); +final ProvenanceEventType eventType = analyzer.targetProvenanceEventType(); +if (eventType != null) { +if (analyzersForProvenanceEventType.containsKey(eventType)) { +logger.warn("Fo ProvenanceEventType {}, an Analyzer {} is already assigned." + +" Only one analyzer for a type can be registered. Ignoring {}", +eventType, analyzersForProvenanceEventType.get(eventType), analyzer); +} +analyzersForProvenanceEventType.put(eventType, analyzer); +} +}); +logger.info("Loaded NiFiProvenanceEventAnalyzers: componentTypes={}, transitUris={}", analyzersForComponentType, analyzersForTransitUri); +} + +private static void addAnalyzer(String patternStr, Map toAdd, +NiFiProvenanceEventAnalyzer analyzer) { +if (patternStr != null && !patternStr.isEmpty()) { +Pattern pattern = Pattern.compile(patternStr.trim()); +toAdd.put(pattern, analyzer); +} +} + +/** + * Find and retrieve NiFiProvenanceEventAnalyzer implementation for the specified targets. + * Pattern matching is performed by following order, and the one found at first is returned: + * + * Component type name. Use an analyzer supporting the Component type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}. + * TransitUri. Use an analyzer supporting the TransitUri with its {@link NiFiProvenanceEventAnalyzer#targetTransitUriPattern()}. + * Provenance Event Type. Use an analyzer supporting the Provenance Event Type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}. + * + * @param typeName NiFi component type name. + * @param transitUri Transit URI. + * @param eventType Provenance event type. + * @return Instance of NiFiProvenanceEventAnalyzer if one is found for the specified className, otherwise null. + */ +p
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291054#comment-16291054 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r156982816 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java --- @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MultivaluedMap; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.toStr; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL; +import static org.apache.nifi.atlas.NiFiTypes.ENTITIES; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class NiFiAtlasClient { + +private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); + +private static NiFiAtlasClient nifiClient; +private AtlasClientV2 atlasClient; + +private NiFiAtlasClient() { +super(); +}
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291051#comment-16291051 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r156982630 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java --- @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MultivaluedMap; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.toStr; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL; +import static org.apache.nifi.atlas.NiFiTypes.ENTITIES; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class NiFiAtlasClient { + +private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); + +private static NiFiAtlasClient nifiClient; +private AtlasClientV2 atlasClient; + +private NiFiAtlasClient() { +super(); +}
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291034#comment-16291034 ] ASF GitHub Bot commented on NIFI-3709: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2335#discussion_r156979528 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java --- @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas; + +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.SearchFilter; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.nifi.atlas.security.AtlasAuthN; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.MultivaluedMap; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName; +import static org.apache.nifi.atlas.AtlasUtils.toStr; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED; +import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME; +import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL; +import static org.apache.nifi.atlas.NiFiTypes.ENTITIES; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT; +import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE; + +public class NiFiAtlasClient { + +private static final Logger logger = LoggerFactory.getLogger(NiFiAtlasClient.class); + +private static NiFiAtlasClient nifiClient; +private AtlasClientV2 atlasClient; + +private NiFiAtlasClient() { +super(); +}
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16285296#comment-16285296 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2335 Dear reviewers. This PR has more than 20K lines of code, it may be difficult to review it quickly. However, I wrote as much documentation as possible at additionalDetail.html of the NiFiAtlasFlowLineage reporting task, and hopefully it helps to make the review cycle quicker. Please read the doc first. Thanks for your time and effort in advance! > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16285293#comment-16285293 ] ASF GitHub Bot commented on NIFI-3709: -- GitHub user ijokarumawak opened a pull request: https://github.com/apache/nifi/pull/2335 NIFI-3709: Export NiFi flow dataset lineage to Apache Atlas Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [x] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [x] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijokarumawak/nifi nifi-3709-5 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2335.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2335 commit d710ceee0fd6076c9bac90f0e3b55f5bb990bd33 Author: Koji Kawamura Date: 2017-10-30T03:41:27Z NIFI-3709: Export NiFi flow dataset lineage to Apache Atlas > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16276267#comment-16276267 ] Koji Kawamura commented on NIFI-3709: - Until NIFI-4564 is addressed, S2S RAW protocol will not be reported in Atlas lineage. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235464#comment-16235464 ] Koji Kawamura commented on NIFI-3709: - [~ikethecoder] I've added an extension point inspired by your improvement, would you rebase your branch using this one? Other parts have been changed, too, so you may need to adjust your code. However, adding FlowFile level lineage strategy should be easier than before. https://github.com/ijokarumawak/nifi/tree/nifi-3709-4 If you can extract a commit which has only FlowFile level lineage strategy additions, it would be very easy for me to cherry-pick it so that your contribution gets recorded in commit history and available when I create a PR using my branch. Also, I've added another strategy, now my branch has 'Simple Path' and 'Complete Path'. I've summarized comparison of these in this Gist. Would you take a look? I wonder if 'Complete Path' would be useful to your use-case, too. Please let me know how you think. Thanks! https://gist.github.com/ijokarumawak/9c3f2b0e44d1f3f2cb1addc9eb53d893 > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224538#comment-16224538 ] Koji Kawamura commented on NIFI-3709: - [~ikethecoder] Thanks for sharing the detailed example. I understand the motivation to track lineage 'By File'. I'll consider a bit more to see if there's any other ways to achieve that. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224291#comment-16224291 ] Aidan Cope commented on NIFI-3709: -- True - and we were aware of the lineage capability in NiFi itself - but as you mention, it is limited to what is tracked in NiFI - so you miss out on the "full" lineage. I've prepared an example to try and show where the requirement is coming from: https://github.com/ikethecoder/nifi-atlas/blob/master/scenarios/path_vs_file_comparison/comparison.md I built a flow that shows a file being received, processed and sent to HDFS. I ran files through it with the reporter configured for "By Path" and then again for "By File" - and supplied the screenshots in Atlas. Have a look - you will see that the "By Path" strategy does not give us the detail that we want. You may have ideas on how to perhaps meet the same requirement, but in a way consistent with the "By Path" strategy. Cheers > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218156#comment-16218156 ] Koji Kawamura commented on NIFI-3709: - [~ikethecoder] Thanks for sharing your work. I'm glad to know that you were able to utilize the extension point to analyze Provenance event such as for PutFile. Those part can be merged easily. So let me merge your commit once my work has been squashed into a final commit. Let's keep in touch! As for the ByFileLineageStrategy, I am not quite sure if it's going to work with Atlas well. Because Atlas is designed for tracking DataSet level lineage rather than event level. That's why I decided to drop granularity to DataSet and flow_path level when NiFi reporting task reports lineage to Atlas. If user would like to see a complete and detailed lineage of a FlowFile, NiFi provenance viewer would be more appropriate I think. By using NiFi provenance UI, user can see where the FlowFile came from and where it went. https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#viewing-flowfile-lineage I think the core benefit of exporting NiFi lineage to Atlas is that it let user can see a bigger picture not only within NiFi, but also other softwares such as Apache Storm, Kafka. Also, FlowFile lives inside a NiFi flow and does not have the same level of persistence like Database record or file on a file system, IMHO. However, I may be missing something. Would you elaborate the benefit of exporting FlowFile level lineage to Atlas? Example Atlas lineage screenshot would be helpful to understand the use-case. Thanks! > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16218124#comment-16218124 ] Aidan Cope commented on NIFI-3709: -- I have been working on some enhancements to the work that was being done by Koji here: https://github.com/ijokarumawak/nifi/tree/nifi-3709-2/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task The work in progress can be seen here: https://github.com/ikethecoder/nifi/tree/nifi-3709-2/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task Would like to get feedback on the proposed changes I am doing. It is for a client, and their requirements require quite a detailed tracking of provenance data from NiFi to Atlas. To this end, I added a parameter to the reporter task to indicate the granularity of the Atlas processes - with the two options being "By Flow Path" and "By Flow File". By Flow Path is the existing approach by Koji. The "By Flow File" uses the File UUID as the qualifier name for the process, rather than the component Id. The client wants to be able to track files through the complete NiFi Flow over time, which can not be easily done using the way it was implemented. I think both use strategies are valid, depending on the use case and requirement - which is why I made it as a parameter on the reporting task. But would really appreciate feedback on whether this is a viable approach and if it can be considered for a pull request? > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16154615#comment-16154615 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak closed the pull request at: https://github.com/apache/nifi/pull/1676 > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16154614#comment-16154614 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/1676 Closing this PR. I'm adding use of Provenance events to support more Processors to report lineage and also covers NiFi Expression Language. The WIP code is available here, and I'm going to create another PR once it's get ready to be reviewed. Thanks! https://github.com/ijokarumawak/nifi/tree/nifi-3709-2/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16154305#comment-16154305 ] ASF GitHub Bot commented on NIFI-3709: -- Github user gjlawran commented on the issue: https://github.com/apache/nifi/pull/1676 We are also looking for assistance with bridging metadata from NiFi into Apache Atlas - and we have posted a [paid opportunity ](https://github.com/bcgov/nifi-atlas/issues/1) those with an interest. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15983595#comment-15983595 ] Joseph Witt commented on NIFI-3709: --- [~ijokarumawak] i've removed the fix version for now. In my opinion it isn't quite ready for a release. If you disagree please advise. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982308#comment-15982308 ] ASF GitHub Bot commented on NIFI-3709: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/1676 @ijokarumawak so i see where this is heading and i get how it can help. That said, given expression language and such things I'm concerned just how much real flow information can be gleaned at design time (like this is looking this in looking at the flow defn itself) versus runtime using the provenance data itself. The current approach has some important limitations like the processors which can be supported have to explicitly referenced in code for now. The runtime approach using provenance events obviously has scale concerns but perhaps that is more correct and warrants further review. What do you think? > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.2.0 > > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982297#comment-15982297 ] ASF GitHub Bot commented on NIFI-3709: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/1676#discussion_r113096464 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/processors/EgressProcessors.java --- @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.atlas.processors; + +import java.util.HashMap; +import java.util.Map; + +public class EgressProcessors { + +private static final Map processors = new HashMap<>(); + +static { +processors.put("org.apache.nifi.processors.hive.PutHiveStreaming", new PutHiveStreaming()); --- End diff -- So these Egress/Ingress processors are all that would be supported initially I take it? It will be interesting to see how this works out and then perhaps we can have this done via an Annotation on processors that could be supported for egress/ingress. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.2.0 > > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982295#comment-15982295 ] ASF GitHub Bot commented on NIFI-3709: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/1676#discussion_r113096205 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml --- @@ -0,0 +1,102 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + + +org.apache.nifi +nifi-atlas-bundle +1.2.0-SNAPSHOT + + +nifi-atlas-reporting-task +jar + + + +org.apache.nifi +nifi-api + + +org.apache.nifi +nifi-processor-utils + + +org.apache.nifi +nifi-ssl-context-service-api + + +org.apache.nifi +nifi-client-dto + + +org.apache.atlas +atlas-client + + +org.apache.atlas +atlas-intg + + +org.apache.atlas +atlas-common + + +org.codehaus.jettison +jettison +1.1 + + + +stax +stax-api + + + + +com.sun.jersey +jersey-json + + +org.codehaus.jackson +jackson-jaxrs +1.9.13 + + +org.codehaus.jackson +jackson-xc +1.9.13 + + + + +org.apache.nifi +nifi-mock +test + + +org.slf4j +slf4j-simple +test + + +junit +junit +4.11 --- End diff -- can remove this version number and probably the whole block. We have junit as a test scope dep in the parent pom. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.2.0 > > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982292#comment-15982292 ] ASF GitHub Bot commented on NIFI-3709: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/1676#discussion_r113095894 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-nar/src/main/resources/META-INF/NOTICE --- @@ -0,0 +1,188 @@ +nifi-atlas-nar +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=== +Apache Software License v2 +=== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Atlas (incubating) +The following NOTICE information applies: + Apache Atlas (incubating) + + Copyright [2015-2017] The Apache Software Foundation + + This product includes software developed at --- End diff -- You can remove line 19 and 20. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.2.0 > > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976284#comment-15976284 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/1676 @joewitt Added L&N. I hope I've done it correctly. Please check. BTW, [SmartBear has acquired Reverb technologies](https://smartbear.com/news/news-releases/sponsorship-of-swagger/) and its copyright has been changed to [SmartBear Software](https://github.com/swagger-api/swagger-core/blob/master/LICENSE). nifi-assembly and nifi-framework-nar still have 'Copyright 2015 Reverb Technologies, Inc.' in their NOTICE file. Should we update those, too? > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.2.0 > > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976131#comment-15976131 ] ASF GitHub Bot commented on NIFI-3709: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/1676 @joewitt Thanks for pointing that out. I will update NOTICE. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.2.0 > > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976124#comment-15976124 ] ASF GitHub Bot commented on NIFI-3709: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/1676 doesnt quite look like all L&N considerations for the nar have been addressed. please update > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.2.0 > > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976123#comment-15976123 ] ASF GitHub Bot commented on NIFI-3709: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/1676#discussion_r112369112 --- Diff: nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-nar/src/main/resources/META-INF/NOTICE --- @@ -0,0 +1,26 @@ +nifi-hbase-nar +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=== +Apache Software License v2 +=== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Atlas +The following NOTICE information applies: + Apache Commons BeanUtils --- End diff -- Copy and paste error. Should say 'Apache Atlas' > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.2.0 > > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976122#comment-15976122 ] Joseph Witt commented on NIFI-3709: --- patch looks ready to roll and someone just asked about this capability today on users list. set fix version and will review more fully shortly. nar size is down to 13MB which is really nice > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.2.0 > > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976120#comment-15976120 ] ASF GitHub Bot commented on NIFI-3709: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/1676 @ijokarumawak can you go ahead and squash these. Thanks for getting it down to 13MB for the nar. Much better! > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > Fix For: 1.2.0 > > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas
[ https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15971333#comment-15971333 ] ASF GitHub Bot commented on NIFI-3709: -- GitHub user ijokarumawak opened a pull request: https://github.com/apache/nifi/pull/1676 NIFI-3709: NiFi flow lineage to Apache Atlas. Adding AtlasNiFiFlowLineage reporting task to create NiFi lineage in Atlas. See the additionalDetails.html for details. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [x] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijokarumawak/nifi nifi-3709 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/1676.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1676 commit 70573ba42b282cc92fbbab5d631dd0ed21ab9a00 Author: Koji Kawamura Date: 2017-03-27T23:01:02Z NIFI-3709: NiFi flow lineage to Apache Atlas. Adding AtlasNiFiFlowLineage reporting task to create NiFi lineage in Atlas. See the additionalDetails.html for details. > Export NiFi flow dataset lineage to Apache Atlas > > > Key: NIFI-3709 > URL: https://issues.apache.org/jira/browse/NIFI-3709 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura > > While Apache NiFi has provenance and event level lineage support within its > data flow, Apache Atlas also does manage lineage between dataset and process > those interacting with such data. > It would be beneficial for users who use both NiFi and Atlas and if they can > see end-to-end data lineage on Atlas lineage graph, as some type of dataset > are processed by both NiFi and technologies around Atlas such as Storm, > Falcon or Sqoop. For example, Kafka topics and Hive tables. > In order to make this integration happen, I propose a NiFi reporting task > that analyzes NiFi flow then creates DataSet and Process entities in Atlas. > The challenge is how to design NiFi flow dataset level lineage within Atlas > lineage graph. > If we just add a single NiFi process and connect every DataSet from/to it, it > would be too ambiguous since it won't be clear which part of a NiFi flow > actually interact with certain dataset. > But if we put every NiFi processor as independent process in Atlas, it would > be too granular, too. Also, we already have detailed event level lineage in > NiFi, we wouldn't need the same level in Atlas. > If we can group certain processors in a NiFI flow as a process in Atlas, it > would be a nice granularity. -- This message was sent by Atlassian JIRA (v6.3.15#6346)