[jira] [Assigned] (NIFIREG-54) UI shows Error when I delete a flow, even though the response from server is success

2017-12-15 Thread Scott Aslan (JIRA)

 [ 
https://issues.apache.org/jira/browse/NIFIREG-54?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Aslan reassigned NIFIREG-54:
--

Assignee: Scott Aslan

> UI shows Error when I delete a flow, even though the response from server is 
> success
> 
>
> Key: NIFIREG-54
> URL: https://issues.apache.org/jira/browse/NIFIREG-54
> Project: NiFi Registry
>  Issue Type: Bug
>Affects Versions: 0.0.1
>Reporter: Mark Payne
>Assignee: Scott Aslan
>Priority: Blocker
>
> When I go to the Registry UI and choose to delete a flow, I get back a "200 
> OK" response from the server, according to Google Chrome's Developer Tools. 
> However, I get an Error back in the UI saying "Error deleting ." 
> If I refresh the page, I do see that it was successfully deleted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (NIFIREG-54) UI shows Error when I delete a flow, even though the response from server is success

2017-12-15 Thread Scott Aslan (JIRA)

 [ 
https://issues.apache.org/jira/browse/NIFIREG-54?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Aslan resolved NIFIREG-54.

Resolution: Cannot Reproduce

> UI shows Error when I delete a flow, even though the response from server is 
> success
> 
>
> Key: NIFIREG-54
> URL: https://issues.apache.org/jira/browse/NIFIREG-54
> Project: NiFi Registry
>  Issue Type: Bug
>Affects Versions: 0.0.1
>Reporter: Mark Payne
>Assignee: Scott Aslan
>Priority: Blocker
>
> When I go to the Registry UI and choose to delete a flow, I get back a "200 
> OK" response from the server, according to Google Chrome's Developer Tools. 
> However, I get an Error back in the UI saying "Error deleting ." 
> If I refresh the page, I do see that it was successfully deleted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (NIFIREG-20) Improve overall page load times

2017-12-15 Thread Scott Aslan (JIRA)

 [ 
https://issues.apache.org/jira/browse/NIFIREG-20?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Aslan closed NIFIREG-20.
--
Resolution: Fixed

> Improve overall page load times
> ---
>
> Key: NIFIREG-20
> URL: https://issues.apache.org/jira/browse/NIFIREG-20
> Project: NiFi Registry
>  Issue Type: Sub-task
>Reporter: Scott Aslan
>Assignee: Scott Aslan
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFIREG-61) Add support for encrypted properties in configuration files

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFIREG-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16293438#comment-16293438
 ] 

ASF GitHub Bot commented on NIFIREG-61:
---

Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi-registry/pull/51#discussion_r157324448
  
--- Diff: 
nifi-registry-properties/src/main/java/org/apache/nifi/registry/security/crypto/CryptoKeyLoader.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.security.crypto;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+public class CryptoKeyLoader {
+
+private static final Logger logger = 
LoggerFactory.getLogger(CryptoKeyLoader.class);
+
+private static final String BOOTSTRAP_KEY_PREFIX = 
"nifi.registry.bootstrap.sensitive.key=";
+
+/**
+ * Returns the key (if any) used to encrypt sensitive properties.
+ * The key extracted from the bootstrap.conf file at the specified 
location.
+ *
+ * @param bootstrapPath the path to the bootstrap file
+ * @return the key in hexadecimal format, or {@link 
CryptoKeyProvider#EMPTY_KEY} if the key is null or empty
+ * @throws IOException if the file is not readable
+ */
+public static String extractKeyFromBootstrapFile(String bootstrapPath) 
throws IOException {
+File bootstrapFile;
+if (StringUtils.isBlank(bootstrapPath)) {
+logger.error("Cannot read from bootstrap.conf file to extract 
encryption key; location not specified");
+throw new IOException("Cannot read from bootstrap.conf without 
file location");
+} else {
+bootstrapFile = new File(bootstrapPath);
+}
+
+String keyValue;
+if (bootstrapFile.exists() && bootstrapFile.canRead()) {
+try (Stream stream = 
Files.lines(Paths.get(bootstrapFile.getAbsolutePath( {
+Optional keyLine = stream.filter(l -> 
l.startsWith(BOOTSTRAP_KEY_PREFIX)).findFirst();
+if (keyLine.isPresent()) {
+keyValue = keyLine.get().split("=", 2)[1];
+keyValue = checkHexKey(keyValue);
+} else {
+keyValue = CryptoKeyProvider.EMPTY_KEY;
+}
+} catch (IOException e) {
+logger.error("Cannot read from bootstrap.conf file at {} 
to extract encryption key", bootstrapFile.getAbsolutePath());
+throw new IOException("Cannot read from bootstrap.conf", 
e);
+}
+} else {
+logger.error("Cannot read from bootstrap.conf file at {} to 
extract encryption key -- file is missing or permissions are incorrect", 
bootstrapFile.getAbsolutePath());
+throw new IOException("Cannot read from bootstrap.conf");
+}
+
+if (CryptoKeyProvider.EMPTY_KEY.equals(keyValue)) {
+logger.warn("No encryption key present in the bootstrap.conf 
file at {}", bootstrapFile.getAbsolutePath());
--- End diff --

I don't think this should be `WARN` level -- `INFO` is fine. 


> Add support for encrypted properties in configuration files
> ---
>
> Key: NIFIREG-61
> URL: https://issues.apache.org/jira/browse/NIFIREG-61
> Project: NiFi Registry
>  Issue Type: New Feature
>Reporter: Kevin Doran
>Assignee: Kevin Doran
> Fix For: 0.0.1
>
>
> The NiFi Registry server is configured by files on disk, e.g., 
> nifi-registry.properties, bootstrap.conf, and XML files 

[GitHub] nifi-registry pull request #51: NIFIREG-61 Add support for encrypted config ...

2017-12-15 Thread alopresto
Github user alopresto commented on a diff in the pull request:

https://github.com/apache/nifi-registry/pull/51#discussion_r157324448
  
--- Diff: 
nifi-registry-properties/src/main/java/org/apache/nifi/registry/security/crypto/CryptoKeyLoader.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.security.crypto;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+public class CryptoKeyLoader {
+
+private static final Logger logger = 
LoggerFactory.getLogger(CryptoKeyLoader.class);
+
+private static final String BOOTSTRAP_KEY_PREFIX = 
"nifi.registry.bootstrap.sensitive.key=";
+
+/**
+ * Returns the key (if any) used to encrypt sensitive properties.
+ * The key extracted from the bootstrap.conf file at the specified 
location.
+ *
+ * @param bootstrapPath the path to the bootstrap file
+ * @return the key in hexadecimal format, or {@link 
CryptoKeyProvider#EMPTY_KEY} if the key is null or empty
+ * @throws IOException if the file is not readable
+ */
+public static String extractKeyFromBootstrapFile(String bootstrapPath) 
throws IOException {
+File bootstrapFile;
+if (StringUtils.isBlank(bootstrapPath)) {
+logger.error("Cannot read from bootstrap.conf file to extract 
encryption key; location not specified");
+throw new IOException("Cannot read from bootstrap.conf without 
file location");
+} else {
+bootstrapFile = new File(bootstrapPath);
+}
+
+String keyValue;
+if (bootstrapFile.exists() && bootstrapFile.canRead()) {
+try (Stream stream = 
Files.lines(Paths.get(bootstrapFile.getAbsolutePath( {
+Optional keyLine = stream.filter(l -> 
l.startsWith(BOOTSTRAP_KEY_PREFIX)).findFirst();
+if (keyLine.isPresent()) {
+keyValue = keyLine.get().split("=", 2)[1];
+keyValue = checkHexKey(keyValue);
+} else {
+keyValue = CryptoKeyProvider.EMPTY_KEY;
+}
+} catch (IOException e) {
+logger.error("Cannot read from bootstrap.conf file at {} 
to extract encryption key", bootstrapFile.getAbsolutePath());
+throw new IOException("Cannot read from bootstrap.conf", 
e);
+}
+} else {
+logger.error("Cannot read from bootstrap.conf file at {} to 
extract encryption key -- file is missing or permissions are incorrect", 
bootstrapFile.getAbsolutePath());
+throw new IOException("Cannot read from bootstrap.conf");
+}
+
+if (CryptoKeyProvider.EMPTY_KEY.equals(keyValue)) {
+logger.warn("No encryption key present in the bootstrap.conf 
file at {}", bootstrapFile.getAbsolutePath());
--- End diff --

I don't think this should be `WARN` level -- `INFO` is fine. 


---


[jira] [Commented] (NIFI-2169) Improve RouteText performance with pre-compilation of RegEx in certain cases

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292197#comment-16292197
 ] 

ASF GitHub Bot commented on NIFI-2169:
--

Github user mgaido91 commented on the issue:

https://github.com/apache/nifi/pull/2343
  
@markap14 I saw that you made most of the commits to it. May you help me 
reviewing this? Thanks.

PS travis CI is failing because it reaches the 4MB log length limit. I am 
not sure why this is happening. Can anyone help me? thanks


> Improve RouteText performance with pre-compilation of RegEx in certain cases
> 
>
> Key: NIFI-2169
> URL: https://issues.apache.org/jira/browse/NIFI-2169
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core Framework
>Affects Versions: 0.6.1
>Reporter: Stephane Maarek
>Assignee: Oleg Zhurakousky
>  Labels: beginner, easy
>
> When using RegEx matches for the RouteText processor (and possibly other 
> processors), the RegEx gets recompiled every time the processor works. The 
> RegEx could be precompiled / cached under certain conditions, in order to 
> improve the performance of the processor
> See email from Mark Payne:
> Re #2: The regular expression is compiled every time. This is done, though, 
> because the Regex allows the Expression
> Language to be used, so the Regex could actually be different for each 
> FlowFile. That being said, it could certainly be
> improved by either (a) pre-compiling in the case that no Expression Language 
> is used and/or (b) cache up to say 10
> Regex'es once they are compiled. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi issue #2343: NIFI-2169: Cache compiled regexp for RouteText

2017-12-15 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/nifi/pull/2343
  
@markap14 I saw that you made most of the commits to it. May you help me 
reviewing this? Thanks.

PS travis CI is failing because it reaches the 4MB log length limit. I am 
not sure why this is happening. Can anyone help me? thanks


---


[jira] [Commented] (NIFI-4005) Add support for Azure Shared Access Signature (SAS) Tokens

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292181#comment-16292181
 ] 

ASF GitHub Bot commented on NIFI-4005:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1886#discussion_r157141180
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/Azure.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public final class Azure {
+public static final String BLOCK = "Block";
+public static final String PAGE = "Page";
+
+public static final PropertyDescriptor ACCOUNT_KEY = new 
PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage 
Account Key")
+.description("The storage account key. This is an admin-like 
password providing access to every container in this account. It is recommended 
" +
+"one uses Shared Access Signature (SAS) token instead 
for fine-grained control with policies. " +
+"There are certain risks in allowing the account key 
to be stored as a flowfile" +
+"attribute. While it does provide for a more flexible 
flow by allowing the account key to " +
+"be fetched dynamically from a flow file attribute, 
care must be taken to restrict access to " +
+"the event provenance data (e.g. by strictly 
controlling the policies governing provenance for this Processor). " +
+"In addition, the provenance repositories may be put 
on encrypted disk partitions.")
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).sensitive(true).build();
+
+public static final PropertyDescriptor ACCOUNT_NAME = new 
PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage 
Account Name")
+.description("The storage account name.  There are certain 
risks in allowing the account name to be stored as a flowfile" +
--- End diff --

A whitespace is needed after the 'as a flowfile'. It's displayed as 'as a 
flowfileattribute.'


> Add support for Azure Shared Access Signature (SAS) Tokens
> --
>
> Key: NIFI-4005
> URL: https://issues.apache.org/jira/browse/NIFI-4005
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Affects Versions: 1.2.0
>Reporter: Andrew Grande
>Priority: Minor
>
> Azure's account name and key are treated as admin, 'root' access credentials. 
> If one has those, every container under this account is fully accessible. An 
> MSFT-recommended approach is to use SAS policies, which provide for a fine 
> grained permission and object control, as well as defined expiration.
> I already have working code, filing this ticket to formally track and submit 
> PR against next.



--
This 

[jira] [Commented] (NIFI-4005) Add support for Azure Shared Access Signature (SAS) Tokens

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292182#comment-16292182
 ] 

ASF GitHub Bot commented on NIFI-4005:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1886#discussion_r157142645
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/Azure.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public final class Azure {
+public static final String BLOCK = "Block";
+public static final String PAGE = "Page";
+
+public static final PropertyDescriptor ACCOUNT_KEY = new 
PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage 
Account Key")
+.description("The storage account key. This is an admin-like 
password providing access to every container in this account. It is recommended 
" +
+"one uses Shared Access Signature (SAS) token instead 
for fine-grained control with policies. " +
+"There are certain risks in allowing the account key 
to be stored as a flowfile" +
+"attribute. While it does provide for a more flexible 
flow by allowing the account key to " +
+"be fetched dynamically from a flow file attribute, 
care must be taken to restrict access to " +
+"the event provenance data (e.g. by strictly 
controlling the policies governing provenance for this Processor). " +
+"In addition, the provenance repositories may be put 
on encrypted disk partitions.")
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).sensitive(true).build();
+
+public static final PropertyDescriptor ACCOUNT_NAME = new 
PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage 
Account Name")
+.description("The storage account name.  There are certain 
risks in allowing the account name to be stored as a flowfile" +
+"attribute. While it does provide for a more flexible 
flow by allowing the account name to " +
+"be fetched dynamically from a flow file attribute, 
care must be taken to restrict access to " +
+"the event provenance data (e.g. by strictly 
controlling the policies governing provenance for this Processor). " +
+"In addition, the provenance repositories may be put 
on encrypted disk partitions.")
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
+
+public static final PropertyDescriptor CONTAINER = new 
PropertyDescriptor.Builder().name("container-name").displayName("Container 
Name")
+.description("Name of the Azure storage 

[jira] [Commented] (NIFI-4005) Add support for Azure Shared Access Signature (SAS) Tokens

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292180#comment-16292180
 ] 

ASF GitHub Bot commented on NIFI-4005:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1886#discussion_r157141861
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/Azure.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public final class Azure {
--- End diff --

The class name may be too broad IMHO. Probably `AzureStorageUtils` if it 
envisions to cover other storage services such as File, Queue or Table. Or 
`AzureBlobUtils` to be more specific. Thoughts?


> Add support for Azure Shared Access Signature (SAS) Tokens
> --
>
> Key: NIFI-4005
> URL: https://issues.apache.org/jira/browse/NIFI-4005
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Affects Versions: 1.2.0
>Reporter: Andrew Grande
>Priority: Minor
>
> Azure's account name and key are treated as admin, 'root' access credentials. 
> If one has those, every container under this account is fully accessible. An 
> MSFT-recommended approach is to use SAS policies, which provide for a fine 
> grained permission and object control, as well as defined expiration.
> I already have working code, filing this ticket to formally track and submit 
> PR against next.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #1886: NIFI-4005 Add support for Azure Shared Access Signa...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1886#discussion_r157141861
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/Azure.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public final class Azure {
--- End diff --

The class name may be too broad IMHO. Probably `AzureStorageUtils` if it 
envisions to cover other storage services such as File, Queue or Table. Or 
`AzureBlobUtils` to be more specific. Thoughts?


---


[GitHub] nifi pull request #1886: NIFI-4005 Add support for Azure Shared Access Signa...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1886#discussion_r157141180
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/Azure.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public final class Azure {
+public static final String BLOCK = "Block";
+public static final String PAGE = "Page";
+
+public static final PropertyDescriptor ACCOUNT_KEY = new 
PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage 
Account Key")
+.description("The storage account key. This is an admin-like 
password providing access to every container in this account. It is recommended 
" +
+"one uses Shared Access Signature (SAS) token instead 
for fine-grained control with policies. " +
+"There are certain risks in allowing the account key 
to be stored as a flowfile" +
+"attribute. While it does provide for a more flexible 
flow by allowing the account key to " +
+"be fetched dynamically from a flow file attribute, 
care must be taken to restrict access to " +
+"the event provenance data (e.g. by strictly 
controlling the policies governing provenance for this Processor). " +
+"In addition, the provenance repositories may be put 
on encrypted disk partitions.")
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).sensitive(true).build();
+
+public static final PropertyDescriptor ACCOUNT_NAME = new 
PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage 
Account Name")
+.description("The storage account name.  There are certain 
risks in allowing the account name to be stored as a flowfile" +
--- End diff --

A whitespace is needed after the 'as a flowfile'. It's displayed as 'as a 
flowfileattribute.'


---


[GitHub] nifi pull request #1886: NIFI-4005 Add support for Azure Shared Access Signa...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1886#discussion_r157142645
  
--- Diff: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/Azure.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageCredentials;
+import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public final class Azure {
+public static final String BLOCK = "Block";
+public static final String PAGE = "Page";
+
+public static final PropertyDescriptor ACCOUNT_KEY = new 
PropertyDescriptor.Builder().name("storage-account-key").displayName("Storage 
Account Key")
+.description("The storage account key. This is an admin-like 
password providing access to every container in this account. It is recommended 
" +
+"one uses Shared Access Signature (SAS) token instead 
for fine-grained control with policies. " +
+"There are certain risks in allowing the account key 
to be stored as a flowfile" +
+"attribute. While it does provide for a more flexible 
flow by allowing the account key to " +
+"be fetched dynamically from a flow file attribute, 
care must be taken to restrict access to " +
+"the event provenance data (e.g. by strictly 
controlling the policies governing provenance for this Processor). " +
+"In addition, the provenance repositories may be put 
on encrypted disk partitions.")
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(false).sensitive(true).build();
+
+public static final PropertyDescriptor ACCOUNT_NAME = new 
PropertyDescriptor.Builder().name("storage-account-name").displayName("Storage 
Account Name")
+.description("The storage account name.  There are certain 
risks in allowing the account name to be stored as a flowfile" +
+"attribute. While it does provide for a more flexible 
flow by allowing the account name to " +
+"be fetched dynamically from a flow file attribute, 
care must be taken to restrict access to " +
+"the event provenance data (e.g. by strictly 
controlling the policies governing provenance for this Processor). " +
+"In addition, the provenance repositories may be put 
on encrypted disk partitions.")
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
+
+public static final PropertyDescriptor CONTAINER = new 
PropertyDescriptor.Builder().name("container-name").displayName("Container 
Name")
+.description("Name of the Azure storage 
container").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
+
+public static final PropertyDescriptor PROP_SAS_TOKEN = new 
PropertyDescriptor.Builder()
+.name("SAS String")
--- End diff --

Please use name and displayName as other properties do. "SAS 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292576#comment-16292576
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157208066
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
+return result;
+}
+logger.warn("Lineage query for {} timed out.", new 
Object[]{eventId});
+} catch (InterruptedException e) {
+logger.warn("Lineage query for {} was interrupted due to {}.", 
new Object[]{eventId, e}, e);
+} finally {
+submission.cancel();
+}
+
+return null;
+}
+
+@Override
+public ComputeLineageResult queryLineage(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitLineageComputation(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+public ComputeLineageResult findParents(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitExpandParents(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+// NOTE: This user is required to avoid NullPointerException at 
PersistentProvenanceRepository.submitLineageComputation
+private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser();
--- End diff --

Actually 

[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157208066
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
+return result;
+}
+logger.warn("Lineage query for {} timed out.", new 
Object[]{eventId});
+} catch (InterruptedException e) {
+logger.warn("Lineage query for {} was interrupted due to {}.", 
new Object[]{eventId, e}, e);
+} finally {
+submission.cancel();
+}
+
+return null;
+}
+
+@Override
+public ComputeLineageResult queryLineage(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitLineageComputation(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+public ComputeLineageResult findParents(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitExpandParents(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+// NOTE: This user is required to avoid NullPointerException at 
PersistentProvenanceRepository.submitLineageComputation
+private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser();
--- End diff --

Actually this user can be anything. Since NiFI authentication and 
authorization is done at the web request layer, when the provenance API is 
called directly, no authentication nor authorization take place. I confirmed 
that provenance query work 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292582#comment-16292582
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157208859
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}
+
+

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292626#comment-16292626
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157216281
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static 

[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157216281
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE;
+import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
+
  

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292577#comment-16292577
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157208324
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}
+
+

[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157208324
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}
+
+public static NiFiAtlasClient getInstance() {
+if (nifiClient == null) {
+synchronized (NiFiAtlasClient.class) {
+if (nifiClient == null) {
+nifiClient = new 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292625#comment-16292625
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157216102
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static 

[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157216102
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE;
+import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
+
  

[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157208859
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}
+
+public static NiFiAtlasClient getInstance() {
+if (nifiClient == null) {
+synchronized (NiFiAtlasClient.class) {
+if (nifiClient == null) {
+nifiClient = new 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292614#comment-16292614
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157214953
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292612#comment-16292612
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157214860
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final 

[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157214860
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final ProvenanceEventRecord previousEvent = 
findPreviousProvenanceEvent(analysisContext, event);
+if (previousEvent == null) {
+logger.warn("Previous event was not found: {}", new 
Object[]{event});
+

[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157214953
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE;
+import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
+
  

[GitHub] nifi pull request #2345: NIFI-4629: Put flowfiles without the grouping attri...

2017-12-15 Thread mgaido91
GitHub user mgaido91 opened a pull request:

https://github.com/apache/nifi/pull/2345

NIFI-4629: Put flowfiles without the grouping attribute in the default group

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?  N/A
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly? N/A
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly? N/A
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties? N/A

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered? N/A

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mgaido91/nifi NIFI-4629

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2345.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2345


commit 96706c5a48045c967c26fb042559db6466f3681b
Author: Marco Gaido 
Date:   2017-12-15T11:59:16Z

NIFI-4629: Put flowfiles without the grouping attribute in the default group




---


[jira] [Commented] (NIFI-4629) Non-existing attribute in ControlRate configuration causes NullPointerException

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292410#comment-16292410
 ] 

ASF GitHub Bot commented on NIFI-4629:
--

GitHub user mgaido91 opened a pull request:

https://github.com/apache/nifi/pull/2345

NIFI-4629: Put flowfiles without the grouping attribute in the default group

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?  N/A
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly? N/A
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly? N/A
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties? N/A

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered? N/A

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mgaido91/nifi NIFI-4629

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2345.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2345


commit 96706c5a48045c967c26fb042559db6466f3681b
Author: Marco Gaido 
Date:   2017-12-15T11:59:16Z

NIFI-4629: Put flowfiles without the grouping attribute in the default group




> Non-existing attribute in ControlRate configuration causes 
> NullPointerException
> ---
>
> Key: NIFI-4629
> URL: https://issues.apache.org/jira/browse/NIFI-4629
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Configuration, Core UI
>Affects Versions: 1.4.0
> Environment: Ubuntu LTS 16.04
> openjdk-8-jre
>Reporter: Fredrik Sko
>Priority: Critical
>  Labels: ControlRate, NullPointerException, Processor
> Attachments: ControlRate-NP.xml
>
>
> When using the ControlRate processor, defining the "Grouping Attribute" with 
> a missing/non-existing attribute name produces NullPointerException errors.
> Processor configuration:
> {quote}
> Rate Control Criteria: flowfile count
> Maximum rate: 10
> Rate Controlled Attributes: (No value set)
> Time Duration: 1 min
> Grouping Attribute: foobar
> {quote}
> ControlRate with the following configuration when sent a flowfile without the 
> attribute {{foobar}} generates the following error:
> bq. ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] 
> ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] failed to process due 
> to java.lang.NullPointerException; rolling back session: null
> Additionally, the incoming flowfiles now ends up in some "dead" state where 
> I'm unable to even empty the queue.
> A simple template for reproduction is attached.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4619) AWSCredentialsProviderControllerService Does Not Apply Expression Language

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292359#comment-16292359
 ] 

ASF GitHub Bot commented on NIFI-4619:
--

GitHub user mgaido91 opened a pull request:

https://github.com/apache/nifi/pull/2344

NIFI-4619: Enable expression language on 
AWSCredentialsProviderControllerService properties

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? N/A 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly? N/A
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly? N/A
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties? N/A

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered? N/A

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mgaido91/nifi NIFI-4619

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2344.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2344


commit 1888a1b83bd1813d296ae3fd2c789b86b973385e
Author: Marco Gaido 
Date:   2017-12-15T10:06:13Z

NIFI-4619: Enable expression language on 
AWSCredentialsProviderControllerService properties




> AWSCredentialsProviderControllerService Does Not Apply Expression Language
> --
>
> Key: NIFI-4619
> URL: https://issues.apache.org/jira/browse/NIFI-4619
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.2.0, 1.1.1, 1.0.1, 1.3.0, 1.4.0
>Reporter: James Wing
>Priority: Minor
>
> As [mentioned on the users mailing 
> list|https://mail-archives.apache.org/mod_mbox/nifi-users/201711.mbox/%3CCAHdHwgxOFF_ExRXkK7j3mc3X8g3Qpq6fbQNge9p7AWu60uk61w%40mail.gmail.com%3E],
>  the AWSCredentialsProviderControllerService does not actually apply 
> expression language for properties documented as supporting expression 
> language (Access Key, Secret Key, Profile Name).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-4619) AWSCredentialsProviderControllerService Does Not Apply Expression Language

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292365#comment-16292365
 ] 

ASF GitHub Bot commented on NIFI-4619:
--

Github user mgaido91 commented on the issue:

https://github.com/apache/nifi/pull/2344
  
@jvwing @mans2singh may I kindly ask you some help reviewing this? I saw 
you are the last contributors on it... Thanks!


> AWSCredentialsProviderControllerService Does Not Apply Expression Language
> --
>
> Key: NIFI-4619
> URL: https://issues.apache.org/jira/browse/NIFI-4619
> Project: Apache NiFi
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.2.0, 1.1.1, 1.0.1, 1.3.0, 1.4.0
>Reporter: James Wing
>Priority: Minor
>
> As [mentioned on the users mailing 
> list|https://mail-archives.apache.org/mod_mbox/nifi-users/201711.mbox/%3CCAHdHwgxOFF_ExRXkK7j3mc3X8g3Qpq6fbQNge9p7AWu60uk61w%40mail.gmail.com%3E],
>  the AWSCredentialsProviderControllerService does not actually apply 
> expression language for properties documented as supporting expression 
> language (Access Key, Secret Key, Profile Name).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi issue #2344: NIFI-4619: Enable expression language on AWSCredentialsPro...

2017-12-15 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/nifi/pull/2344
  
@jvwing @mans2singh may I kindly ask you some help reviewing this? I saw 
you are the last contributors on it... Thanks!


---


[GitHub] nifi pull request #2344: NIFI-4619: Enable expression language on AWSCredent...

2017-12-15 Thread mgaido91
GitHub user mgaido91 opened a pull request:

https://github.com/apache/nifi/pull/2344

NIFI-4619: Enable expression language on 
AWSCredentialsProviderControllerService properties

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? N/A 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly? N/A
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly? N/A
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties? N/A

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered? N/A

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mgaido91/nifi NIFI-4619

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2344.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2344


commit 1888a1b83bd1813d296ae3fd2c789b86b973385e
Author: Marco Gaido 
Date:   2017-12-15T10:06:13Z

NIFI-4619: Enable expression language on 
AWSCredentialsProviderControllerService properties




---


[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157217791
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE;
+import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
+
  

[GitHub] nifi-minifi-cpp issue #219: MINIFICPP-330 Implemented Expression Language su...

2017-12-15 Thread achristianson
Github user achristianson commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/219
  
@phrocker yes, if they call substringBefore on an attr with no other 
arguments, then they should get an "attempted to call incomplete function." 
I'll add a test case for that.


---


[GitHub] nifi issue #2343: NIFI-2169: Cache compiled regexp for RouteText

2017-12-15 Thread joewitt
Github user joewitt commented on the issue:

https://github.com/apache/nifi/pull/2343
  
@mgaido91 it failed due to a checkstyle issue.  You'll want to look at the 
raw logs and scroll to the bottom.  The key issue is

[WARNING] 
src/test/java/org/apache/nifi/processors/standard/TestRouteText.java:[34,8] 
(imports) UnusedImports: Unused import - 
org.apache.nifi.util.MockProcessSession.


---


[jira] [Commented] (NIFI-4629) Non-existing attribute in ControlRate configuration causes NullPointerException

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292685#comment-16292685
 ] 

ASF GitHub Bot commented on NIFI-4629:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2345


> Non-existing attribute in ControlRate configuration causes 
> NullPointerException
> ---
>
> Key: NIFI-4629
> URL: https://issues.apache.org/jira/browse/NIFI-4629
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Configuration, Core UI
>Affects Versions: 1.4.0
> Environment: Ubuntu LTS 16.04
> openjdk-8-jre
>Reporter: Fredrik Sko
>Priority: Critical
>  Labels: ControlRate, NullPointerException, Processor
> Attachments: ControlRate-NP.xml
>
>
> When using the ControlRate processor, defining the "Grouping Attribute" with 
> a missing/non-existing attribute name produces NullPointerException errors.
> Processor configuration:
> {quote}
> Rate Control Criteria: flowfile count
> Maximum rate: 10
> Rate Controlled Attributes: (No value set)
> Time Duration: 1 min
> Grouping Attribute: foobar
> {quote}
> ControlRate with the following configuration when sent a flowfile without the 
> attribute {{foobar}} generates the following error:
> bq. ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] 
> ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] failed to process due 
> to java.lang.NullPointerException; rolling back session: null
> Additionally, the incoming flowfiles now ends up in some "dead" state where 
> I'm unable to even empty the queue.
> A simple template for reproduction is attached.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (NIFI-4629) Non-existing attribute in ControlRate configuration causes NullPointerException

2017-12-15 Thread Joseph Witt (JIRA)

 [ 
https://issues.apache.org/jira/browse/NIFI-4629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph Witt updated NIFI-4629:
--
Status: Patch Available  (was: Open)

> Non-existing attribute in ControlRate configuration causes 
> NullPointerException
> ---
>
> Key: NIFI-4629
> URL: https://issues.apache.org/jira/browse/NIFI-4629
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Configuration, Core UI
>Affects Versions: 1.4.0
> Environment: Ubuntu LTS 16.04
> openjdk-8-jre
>Reporter: Fredrik Sko
>Priority: Critical
>  Labels: ControlRate, NullPointerException, Processor
> Attachments: ControlRate-NP.xml
>
>
> When using the ControlRate processor, defining the "Grouping Attribute" with 
> a missing/non-existing attribute name produces NullPointerException errors.
> Processor configuration:
> {quote}
> Rate Control Criteria: flowfile count
> Maximum rate: 10
> Rate Controlled Attributes: (No value set)
> Time Duration: 1 min
> Grouping Attribute: foobar
> {quote}
> ControlRate with the following configuration when sent a flowfile without the 
> attribute {{foobar}} generates the following error:
> bq. ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] 
> ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] failed to process due 
> to java.lang.NullPointerException; rolling back session: null
> Additionally, the incoming flowfiles now ends up in some "dead" state where 
> I'm unable to even empty the queue.
> A simple template for reproduction is attached.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (NIFI-4629) Non-existing attribute in ControlRate configuration causes NullPointerException

2017-12-15 Thread Joseph Witt (JIRA)

 [ 
https://issues.apache.org/jira/browse/NIFI-4629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph Witt updated NIFI-4629:
--
Resolution: Fixed
Status: Resolved  (was: Patch Available)

+1 merged to master.  Thanks for your first contrib Marco!

> Non-existing attribute in ControlRate configuration causes 
> NullPointerException
> ---
>
> Key: NIFI-4629
> URL: https://issues.apache.org/jira/browse/NIFI-4629
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Configuration, Core UI
>Affects Versions: 1.4.0
> Environment: Ubuntu LTS 16.04
> openjdk-8-jre
>Reporter: Fredrik Sko
>Priority: Critical
>  Labels: ControlRate, NullPointerException, Processor
> Attachments: ControlRate-NP.xml
>
>
> When using the ControlRate processor, defining the "Grouping Attribute" with 
> a missing/non-existing attribute name produces NullPointerException errors.
> Processor configuration:
> {quote}
> Rate Control Criteria: flowfile count
> Maximum rate: 10
> Rate Controlled Attributes: (No value set)
> Time Duration: 1 min
> Grouping Attribute: foobar
> {quote}
> ControlRate with the following configuration when sent a flowfile without the 
> attribute {{foobar}} generates the following error:
> bq. ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] 
> ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] failed to process due 
> to java.lang.NullPointerException; rolling back session: null
> Additionally, the incoming flowfiles now ends up in some "dead" state where 
> I'm unable to even empty the queue.
> A simple template for reproduction is attached.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157234799
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.AtlasNiFiFlowLineage/additionalDetails.html
 ---
@@ -0,0 +1,541 @@
+
+
+
+
+
+AtlasNiFiFlowLineage
+
+
+
+
+AtlasNiFiFlowLineage
+
+Table of contents:
+
+
+Information reported to 
Atlas
+NiFi Atlas Types
+Cluster Name Resolution
+NiFi flow structure
+
+Path Separation 
Logic
+
+
+NiFi data lineage
+
+NiFi Lineage 
Strategy
+NiFi Provenance Event 
Analysis
+Supported 
DataSets and Processors
+
+
+How it runs in NiFi 
cluster
+Limitations
+Atlas Server 
Configurations
+Atlas Server Emulator
+
+
+Information reported to Atlas
+This reporting task stores two types of NiFi flow information, 
'NiFi flow structure' and 'NiFi data lineage'.
+
+'NiFi flow structure' tells what components are running within 
a NiFi flow and how these are connected. It is reported by analyzing current 
NiFi flow structure, specifically NiFi component relationships.
+
+'NiFi data lineage' tells what part of NiFi flow interacts with 
different DataSets such as HDFS files or Hive tables ... etc. It is reported by 
analyzing NiFi provenance events.
+
+
+
+Technically each information is sent using different protocol, 
Atlas REST API v2, and Notification via a Kafka topic as shown in above 
image.
+
+
+As both information types use the same NiFi Atlas Types and Cluster Name Resolution concepts, it is recommended to 
start reading those sections first.
+
+NiFi Atlas Types
+
+This reporting task creates following NiFi specific types in 
Atlas Type system when it runs if these type definitions are not found.
+
+Green boxes represent sub-types of DataSet and blue ones are 
sub-types of Process. Gray lines represent entity ownership.
+Red lines represent lineage.
+
+
+
+
+nifi_flow
+Represents a NiFI data flow.
+As shown in the above diagram, nifi_flow owns other 
nifi_component types.
+This owning relationship is defined by Atlas 'owned' 
constraint so that when a 'nifi_flow' entity is removed, all owned NiFi 
component entities are removed in cascading manner.
+When this reporting task runs, it analyzes and traverse 
the entire flow structure, and create NiFi component entities in Atlas.
+At later runs, it compares the current flow structure 
with the one stored in Atlas to figure out if any changes has been made since 
the last time the flow was reported. The reporting task updates NiFi component 
entities in Atlas if needed.
+NiFi components those are removed from a NiFi flow also 
get deleted from Atlas.
+However those entities can still be seen in Atlas 
search results or lineage graphs since Atlas uses 'Soft Delete' by default.
+See Atlas Delete Handler 
for further detail.
+
+Attributes:
+
+qualifiedName: Root ProcessGroup ID@clusterName (e.g. 
86420a14-2fab-3e1e-4331-fb6ab42f58e0@cl1)
+name: Name of the Root ProcessGroup.
+url: URL of the NiFi instance. This can be specified 
via reporting task 'NiFi URL for Atlas' property.
+
+
+
+nifi_flow_path Part of a NiFi data flow containing one 
or more processing NiFi components such as Processors and RemoteGroupPorts. The 
reporting task divides a NiFi flow into multiple flow paths. See Path Separation Logic for details.
+Attributes:
+
+qualifiedName: The first NiFi component Id in a 
path@clusterName (e.g. 529e6722-9b49-3b66-9c94-00da9863ca2d@cl1)
+name: NiFi component namess within a path are 
concatenated (e.g. GenerateFlowFile, PutFile, LogAttribute)
+url: A deep link to the first NiFi component in 
corresponding NiFi UI
+
+
+
+
--- End diff --

Forgot to delete the TODO. I was willing to write about the detail, but 
it's too 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292739#comment-16292739
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157238842
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
+return result;
+}
+logger.warn("Lineage query for {} timed out.", new 
Object[]{eventId});
+} catch (InterruptedException e) {
+logger.warn("Lineage query for {} was interrupted due to {}.", 
new Object[]{eventId, e}, e);
+} finally {
+submission.cancel();
+}
+
+return null;
+}
+
+@Override
+public ComputeLineageResult queryLineage(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitLineageComputation(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+public ComputeLineageResult findParents(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitExpandParents(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+// NOTE: This user is required to avoid NullPointerException at 
PersistentProvenanceRepository.submitLineageComputation
+private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser();
--- End diff --

I see. What 

[jira] [Commented] (NIFI-2169) Improve RouteText performance with pre-compilation of RegEx in certain cases

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292753#comment-16292753
 ] 

ASF GitHub Bot commented on NIFI-2169:
--

Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2343#discussion_r157241548
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
 ---
@@ -209,6 +215,30 @@
 private volatile Map propertyMap = new 
HashMap<>();
 private volatile Pattern groupingRegex = null;
 
+@VisibleForTesting
+final static int PATTERNS_CACHE_MAXIMUM_ENTRIES = 10;
+
+/**
+ * LRU cache for the compiled patterns. The size of the cache is 
determined by the value of
+ * {@link #PATTERNS_CACHE_MAXIMUM_ENTRIES}.
+ */
+@VisibleForTesting
+final ConcurrentMap, Pattern> patternsCache = 
CacheBuilder.newBuilder()
--- End diff --

yes, this was my other option. I will do this, thanks.


> Improve RouteText performance with pre-compilation of RegEx in certain cases
> 
>
> Key: NIFI-2169
> URL: https://issues.apache.org/jira/browse/NIFI-2169
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core Framework
>Affects Versions: 0.6.1
>Reporter: Stephane Maarek
>Assignee: Oleg Zhurakousky
>  Labels: beginner, easy
>
> When using RegEx matches for the RouteText processor (and possibly other 
> processors), the RegEx gets recompiled every time the processor works. The 
> RegEx could be precompiled / cached under certain conditions, in order to 
> improve the performance of the processor
> See email from Mark Payne:
> Re #2: The regular expression is compiled every time. This is done, though, 
> because the Regex allows the Expression
> Language to be used, so the Regex could actually be different for each 
> FlowFile. That being said, it could certainly be
> improved by either (a) pre-compiling in the case that no Expression Language 
> is used and/or (b) cache up to say 10
> Regex'es once they are compiled. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2343: NIFI-2169: Cache compiled regexp for RouteText

2017-12-15 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2343#discussion_r157241548
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
 ---
@@ -209,6 +215,30 @@
 private volatile Map propertyMap = new 
HashMap<>();
 private volatile Pattern groupingRegex = null;
 
+@VisibleForTesting
+final static int PATTERNS_CACHE_MAXIMUM_ENTRIES = 10;
+
+/**
+ * LRU cache for the compiled patterns. The size of the cache is 
determined by the value of
+ * {@link #PATTERNS_CACHE_MAXIMUM_ENTRIES}.
+ */
+@VisibleForTesting
+final ConcurrentMap, Pattern> patternsCache = 
CacheBuilder.newBuilder()
--- End diff --

yes, this was my other option. I will do this, thanks.


---


[jira] [Commented] (MINIFICPP-341) Add CentOS6 build instructions

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292752#comment-16292752
 ] 

ASF GitHub Bot commented on MINIFICPP-341:
--

Github user achristianson commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/216
  
@phrocker resolved.


> Add CentOS6 build instructions
> --
>
> Key: MINIFICPP-341
> URL: https://issues.apache.org/jira/browse/MINIFICPP-341
> Project: NiFi MiNiFi C++
>  Issue Type: Improvement
>Reporter: Andrew Christianson
>Assignee: Andrew Christianson
>
> Add docs for building on CentOS6



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi-minifi-cpp issue #216: MINIFICPP-341 Add CentOS6 build instructions

2017-12-15 Thread achristianson
Github user achristianson commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/216
  
@phrocker resolved.


---


[jira] [Commented] (NIFI-2169) Improve RouteText performance with pre-compilation of RegEx in certain cases

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292764#comment-16292764
 ] 

ASF GitHub Bot commented on NIFI-2169:
--

Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2343#discussion_r157242761
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
 ---
@@ -209,6 +215,30 @@
 private volatile Map propertyMap = new 
HashMap<>();
 private volatile Pattern groupingRegex = null;
 
+@VisibleForTesting
+final static int PATTERNS_CACHE_MAXIMUM_ENTRIES = 10;
+
+/**
+ * LRU cache for the compiled patterns. The size of the cache is 
determined by the value of
+ * {@link #PATTERNS_CACHE_MAXIMUM_ENTRIES}.
+ */
+@VisibleForTesting
+final ConcurrentMap, Pattern> patternsCache = 
CacheBuilder.newBuilder()
+.maximumSize(PATTERNS_CACHE_MAXIMUM_ENTRIES)
+., Pattern>build()
+.asMap();
+
+private final Function, Pattern> compileRegex = 
ignoreCaseAndRegex -> {
--- End diff --

for the `Pair`, I'll get rid of it following your suggestion above.

For the `Function`, my goal in defining the function as a parameter was to 
avoid the creation of a new `Function` object at every invocation, using always 
the same. What do you think?


> Improve RouteText performance with pre-compilation of RegEx in certain cases
> 
>
> Key: NIFI-2169
> URL: https://issues.apache.org/jira/browse/NIFI-2169
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core Framework
>Affects Versions: 0.6.1
>Reporter: Stephane Maarek
>Assignee: Oleg Zhurakousky
>  Labels: beginner, easy
>
> When using RegEx matches for the RouteText processor (and possibly other 
> processors), the RegEx gets recompiled every time the processor works. The 
> RegEx could be precompiled / cached under certain conditions, in order to 
> improve the performance of the processor
> See email from Mark Payne:
> Re #2: The regular expression is compiled every time. This is done, though, 
> because the Regex allows the Expression
> Language to be used, so the Regex could actually be different for each 
> FlowFile. That being said, it could certainly be
> improved by either (a) pre-compiling in the case that no Expression Language 
> is used and/or (b) cache up to say 10
> Regex'es once they are compiled. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2343: NIFI-2169: Cache compiled regexp for RouteText

2017-12-15 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2343#discussion_r157242761
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
 ---
@@ -209,6 +215,30 @@
 private volatile Map propertyMap = new 
HashMap<>();
 private volatile Pattern groupingRegex = null;
 
+@VisibleForTesting
+final static int PATTERNS_CACHE_MAXIMUM_ENTRIES = 10;
+
+/**
+ * LRU cache for the compiled patterns. The size of the cache is 
determined by the value of
+ * {@link #PATTERNS_CACHE_MAXIMUM_ENTRIES}.
+ */
+@VisibleForTesting
+final ConcurrentMap, Pattern> patternsCache = 
CacheBuilder.newBuilder()
+.maximumSize(PATTERNS_CACHE_MAXIMUM_ENTRIES)
+., Pattern>build()
+.asMap();
+
+private final Function, Pattern> compileRegex = 
ignoreCaseAndRegex -> {
--- End diff --

for the `Pair`, I'll get rid of it following your suggestion above.

For the `Function`, my goal in defining the function as a parameter was to 
avoid the creation of a new `Function` object at every invocation, using always 
the same. What do you think?


---


[jira] [Commented] (NIFI-4629) Non-existing attribute in ControlRate configuration causes NullPointerException

2017-12-15 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292762#comment-16292762
 ] 

Marco Gaido commented on NIFI-4629:
---

Thanks [~joewitt]

> Non-existing attribute in ControlRate configuration causes 
> NullPointerException
> ---
>
> Key: NIFI-4629
> URL: https://issues.apache.org/jira/browse/NIFI-4629
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Configuration, Core UI
>Affects Versions: 1.4.0
> Environment: Ubuntu LTS 16.04
> openjdk-8-jre
>Reporter: Fredrik Sko
>Priority: Critical
>  Labels: ControlRate, NullPointerException, Processor
> Fix For: 1.5.0
>
> Attachments: ControlRate-NP.xml
>
>
> When using the ControlRate processor, defining the "Grouping Attribute" with 
> a missing/non-existing attribute name produces NullPointerException errors.
> Processor configuration:
> {quote}
> Rate Control Criteria: flowfile count
> Maximum rate: 10
> Rate Controlled Attributes: (No value set)
> Time Duration: 1 min
> Grouping Attribute: foobar
> {quote}
> ControlRate with the following configuration when sent a flowfile without the 
> attribute {{foobar}} generates the following error:
> bq. ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] 
> ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] failed to process due 
> to java.lang.NullPointerException; rolling back session: null
> Additionally, the incoming flowfiles now ends up in some "dead" state where 
> I'm unable to even empty the queue.
> A simple template for reproduction is attached.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157220655
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}
+
+public static NiFiAtlasClient getInstance() {
+if (nifiClient == null) {
+synchronized (NiFiAtlasClient.class) {
+if (nifiClient == null) {
+nifiClient = new 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292695#comment-16292695
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157232014
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+
+public class NiFiProvenanceEventAnalyzerFactory {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiProvenanceEventAnalyzerFactory.class);
+private static final Map 
analyzersForComponentType = new ConcurrentHashMap<>();
+private static final Map 
analyzersForTransitUri = new ConcurrentHashMap<>();
+private static final Map analyzersForProvenanceEventType = new 
ConcurrentHashMap<>();
+private static boolean loaded = false;
+
+private static void loadAnalyzers() {
+logger.debug("Loading NiFiProvenanceEventAnalyzer ...");
+final ServiceLoader serviceLoader
+= ServiceLoader.load(NiFiProvenanceEventAnalyzer.class);
+serviceLoader.forEach(analyzer -> {
+addAnalyzer(analyzer.targetComponentTypePattern(), 
analyzersForComponentType, analyzer);
+addAnalyzer(analyzer.targetTransitUriPattern(), 
analyzersForTransitUri, analyzer);
+final ProvenanceEventType eventType = 
analyzer.targetProvenanceEventType();
+if (eventType != null) {
+if 
(analyzersForProvenanceEventType.containsKey(eventType)) {
+logger.warn("Fo ProvenanceEventType {}, an Analyzer {} 
is already assigned." +
+" Only one analyzer for a type can be 
registered. Ignoring {}",
+eventType, 
analyzersForProvenanceEventType.get(eventType), analyzer);
+}
+analyzersForProvenanceEventType.put(eventType, analyzer);
+}
+});
+logger.info("Loaded NiFiProvenanceEventAnalyzers: 
componentTypes={}, transitUris={}", analyzersForComponentType, 
analyzersForTransitUri);
+}
+
+private static void addAnalyzer(String patternStr, Map toAdd,
+NiFiProvenanceEventAnalyzer analyzer) {
+if (patternStr != null && !patternStr.isEmpty()) {
+Pattern pattern = Pattern.compile(patternStr.trim());
+toAdd.put(pattern, analyzer);
+}
+}
+
+/**
+ * Find and retrieve NiFiProvenanceEventAnalyzer implementation for 
the specified targets.
+ * Pattern matching is performed by following order, and the one found 
at first is returned:
+ * 
+ * Component type name. Use an analyzer supporting the Component 
type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * TransitUri. Use an analyzer supporting the TransitUri with its 
{@link NiFiProvenanceEventAnalyzer#targetTransitUriPattern()}.
+ * Provenance Event Type. Use an analyzer supporting the 
Provenance Event Type with its {@link 
NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * 
+ * @param typeName NiFi component type name.
+ * @param transitUri Transit URI.
+ * @param eventType Provenance 

[jira] [Commented] (NIFI-2169) Improve RouteText performance with pre-compilation of RegEx in certain cases

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292693#comment-16292693
 ] 

ASF GitHub Bot commented on NIFI-2169:
--

Github user mgaido91 commented on the issue:

https://github.com/apache/nifi/pull/2343
  
I see, thanks @joewitt. I made some errors in finding the right log in 
travis. I still have to familiarize with it. Thank you very much, I am fixing 
it.


> Improve RouteText performance with pre-compilation of RegEx in certain cases
> 
>
> Key: NIFI-2169
> URL: https://issues.apache.org/jira/browse/NIFI-2169
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core Framework
>Affects Versions: 0.6.1
>Reporter: Stephane Maarek
>Assignee: Oleg Zhurakousky
>  Labels: beginner, easy
>
> When using RegEx matches for the RouteText processor (and possibly other 
> processors), the RegEx gets recompiled every time the processor works. The 
> RegEx could be precompiled / cached under certain conditions, in order to 
> improve the performance of the processor
> See email from Mark Payne:
> Re #2: The regular expression is compiled every time. This is done, though, 
> because the Regex allows the Expression
> Language to be used, so the Regex could actually be different for each 
> FlowFile. That being said, it could certainly be
> improved by either (a) pre-compiling in the case that no Expression Language 
> is used and/or (b) cache up to say 10
> Regex'es once they are compiled. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292694#comment-16292694
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157231905
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+
+public class NiFiProvenanceEventAnalyzerFactory {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiProvenanceEventAnalyzerFactory.class);
+private static final Map 
analyzersForComponentType = new ConcurrentHashMap<>();
+private static final Map 
analyzersForTransitUri = new ConcurrentHashMap<>();
+private static final Map analyzersForProvenanceEventType = new 
ConcurrentHashMap<>();
+private static boolean loaded = false;
+
+private static void loadAnalyzers() {
+logger.debug("Loading NiFiProvenanceEventAnalyzer ...");
+final ServiceLoader serviceLoader
+= ServiceLoader.load(NiFiProvenanceEventAnalyzer.class);
+serviceLoader.forEach(analyzer -> {
+addAnalyzer(analyzer.targetComponentTypePattern(), 
analyzersForComponentType, analyzer);
+addAnalyzer(analyzer.targetTransitUriPattern(), 
analyzersForTransitUri, analyzer);
+final ProvenanceEventType eventType = 
analyzer.targetProvenanceEventType();
+if (eventType != null) {
+if 
(analyzersForProvenanceEventType.containsKey(eventType)) {
+logger.warn("Fo ProvenanceEventType {}, an Analyzer {} 
is already assigned." +
+" Only one analyzer for a type can be 
registered. Ignoring {}",
+eventType, 
analyzersForProvenanceEventType.get(eventType), analyzer);
+}
+analyzersForProvenanceEventType.put(eventType, analyzer);
+}
+});
+logger.info("Loaded NiFiProvenanceEventAnalyzers: 
componentTypes={}, transitUris={}", analyzersForComponentType, 
analyzersForTransitUri);
+}
+
+private static void addAnalyzer(String patternStr, Map toAdd,
+NiFiProvenanceEventAnalyzer analyzer) {
+if (patternStr != null && !patternStr.isEmpty()) {
+Pattern pattern = Pattern.compile(patternStr.trim());
+toAdd.put(pattern, analyzer);
+}
+}
+
+/**
+ * Find and retrieve NiFiProvenanceEventAnalyzer implementation for 
the specified targets.
+ * Pattern matching is performed by following order, and the one found 
at first is returned:
+ * 
+ * Component type name. Use an analyzer supporting the Component 
type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * TransitUri. Use an analyzer supporting the TransitUri with its 
{@link NiFiProvenanceEventAnalyzer#targetTransitUriPattern()}.
+ * Provenance Event Type. Use an analyzer supporting the 
Provenance Event Type with its {@link 
NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * 
+ * @param typeName NiFi component type name.
+ * @param transitUri Transit URI.
+ * @param eventType Provenance 

[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157231905
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+
+public class NiFiProvenanceEventAnalyzerFactory {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiProvenanceEventAnalyzerFactory.class);
+private static final Map 
analyzersForComponentType = new ConcurrentHashMap<>();
+private static final Map 
analyzersForTransitUri = new ConcurrentHashMap<>();
+private static final Map analyzersForProvenanceEventType = new 
ConcurrentHashMap<>();
+private static boolean loaded = false;
+
+private static void loadAnalyzers() {
+logger.debug("Loading NiFiProvenanceEventAnalyzer ...");
+final ServiceLoader serviceLoader
+= ServiceLoader.load(NiFiProvenanceEventAnalyzer.class);
+serviceLoader.forEach(analyzer -> {
+addAnalyzer(analyzer.targetComponentTypePattern(), 
analyzersForComponentType, analyzer);
+addAnalyzer(analyzer.targetTransitUriPattern(), 
analyzersForTransitUri, analyzer);
+final ProvenanceEventType eventType = 
analyzer.targetProvenanceEventType();
+if (eventType != null) {
+if 
(analyzersForProvenanceEventType.containsKey(eventType)) {
+logger.warn("Fo ProvenanceEventType {}, an Analyzer {} 
is already assigned." +
+" Only one analyzer for a type can be 
registered. Ignoring {}",
+eventType, 
analyzersForProvenanceEventType.get(eventType), analyzer);
+}
+analyzersForProvenanceEventType.put(eventType, analyzer);
+}
+});
+logger.info("Loaded NiFiProvenanceEventAnalyzers: 
componentTypes={}, transitUris={}", analyzersForComponentType, 
analyzersForTransitUri);
+}
+
+private static void addAnalyzer(String patternStr, Map toAdd,
+NiFiProvenanceEventAnalyzer analyzer) {
+if (patternStr != null && !patternStr.isEmpty()) {
+Pattern pattern = Pattern.compile(patternStr.trim());
+toAdd.put(pattern, analyzer);
+}
+}
+
+/**
+ * Find and retrieve NiFiProvenanceEventAnalyzer implementation for 
the specified targets.
+ * Pattern matching is performed by following order, and the one found 
at first is returned:
+ * 
+ * Component type name. Use an analyzer supporting the Component 
type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * TransitUri. Use an analyzer supporting the TransitUri with its 
{@link NiFiProvenanceEventAnalyzer#targetTransitUriPattern()}.
+ * Provenance Event Type. Use an analyzer supporting the 
Provenance Event Type with its {@link 
NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * 
+ * @param typeName NiFi component type name.
+ * @param transitUri Transit URI.
+ * @param eventType Provenance event type.
+ * @return Instance of NiFiProvenanceEventAnalyzer if one is found for 
the specified className, otherwise null.
+ */
+public static NiFiProvenanceEventAnalyzer getAnalyzer(String typeName, 
String 

[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157232014
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/NiFiProvenanceEventAnalyzerFactory.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
+
+public class NiFiProvenanceEventAnalyzerFactory {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiProvenanceEventAnalyzerFactory.class);
+private static final Map 
analyzersForComponentType = new ConcurrentHashMap<>();
+private static final Map 
analyzersForTransitUri = new ConcurrentHashMap<>();
+private static final Map analyzersForProvenanceEventType = new 
ConcurrentHashMap<>();
+private static boolean loaded = false;
+
+private static void loadAnalyzers() {
+logger.debug("Loading NiFiProvenanceEventAnalyzer ...");
+final ServiceLoader serviceLoader
+= ServiceLoader.load(NiFiProvenanceEventAnalyzer.class);
+serviceLoader.forEach(analyzer -> {
+addAnalyzer(analyzer.targetComponentTypePattern(), 
analyzersForComponentType, analyzer);
+addAnalyzer(analyzer.targetTransitUriPattern(), 
analyzersForTransitUri, analyzer);
+final ProvenanceEventType eventType = 
analyzer.targetProvenanceEventType();
+if (eventType != null) {
+if 
(analyzersForProvenanceEventType.containsKey(eventType)) {
+logger.warn("Fo ProvenanceEventType {}, an Analyzer {} 
is already assigned." +
+" Only one analyzer for a type can be 
registered. Ignoring {}",
+eventType, 
analyzersForProvenanceEventType.get(eventType), analyzer);
+}
+analyzersForProvenanceEventType.put(eventType, analyzer);
+}
+});
+logger.info("Loaded NiFiProvenanceEventAnalyzers: 
componentTypes={}, transitUris={}", analyzersForComponentType, 
analyzersForTransitUri);
+}
+
+private static void addAnalyzer(String patternStr, Map toAdd,
+NiFiProvenanceEventAnalyzer analyzer) {
+if (patternStr != null && !patternStr.isEmpty()) {
+Pattern pattern = Pattern.compile(patternStr.trim());
+toAdd.put(pattern, analyzer);
+}
+}
+
+/**
+ * Find and retrieve NiFiProvenanceEventAnalyzer implementation for 
the specified targets.
+ * Pattern matching is performed by following order, and the one found 
at first is returned:
+ * 
+ * Component type name. Use an analyzer supporting the Component 
type with its {@link NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * TransitUri. Use an analyzer supporting the TransitUri with its 
{@link NiFiProvenanceEventAnalyzer#targetTransitUriPattern()}.
+ * Provenance Event Type. Use an analyzer supporting the 
Provenance Event Type with its {@link 
NiFiProvenanceEventAnalyzer#targetProvenanceEventType()}.
+ * 
+ * @param typeName NiFi component type name.
+ * @param transitUri Transit URI.
+ * @param eventType Provenance event type.
+ * @return Instance of NiFiProvenanceEventAnalyzer if one is found for 
the specified className, otherwise null.
+ */
+public static NiFiProvenanceEventAnalyzer getAnalyzer(String typeName, 
String 

[GitHub] nifi issue #2343: NIFI-2169: Cache compiled regexp for RouteText

2017-12-15 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/nifi/pull/2343
  
I see, thanks @joewitt. I made some errors in finding the right log in 
travis. I still have to familiarize with it. Thank you very much, I am fixing 
it.


---


[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157238842
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
+return result;
+}
+logger.warn("Lineage query for {} timed out.", new 
Object[]{eventId});
+} catch (InterruptedException e) {
+logger.warn("Lineage query for {} was interrupted due to {}.", 
new Object[]{eventId, e}, e);
+} finally {
+submission.cancel();
+}
+
+return null;
+}
+
+@Override
+public ComputeLineageResult queryLineage(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitLineageComputation(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+public ComputeLineageResult findParents(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitExpandParents(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+// NOTE: This user is required to avoid NullPointerException at 
PersistentProvenanceRepository.submitLineageComputation
+private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser();
--- End diff --

I see. What I am seeing is different from your description. I will try to 
find where the event level auth is implemented.


---


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292747#comment-16292747
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157240784
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292640#comment-16292640
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157217791
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292648#comment-16292648
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157219892
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
--- End diff --

I agree on that this can be a costly operation. The reason to query 
provenance is to compute a lineage from a DROP provenance event. This is used 
by 'Complete Path' strategy. I wrote documentation on performance impact. If 
this does not work for a use-case, then user can choose another strategy, 
'Simple Path'. Simple Path does not query provenance events this way. It 
analyzes each individual event, so should be more lightweight.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are 

[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157219892
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
--- End diff --

I agree on that this can be a costly operation. The reason to query 
provenance is to compute a lineage from a DROP provenance event. This is used 
by 'Complete Path' strategy. I wrote documentation on performance impact. If 
this does not work for a use-case, then user can choose another strategy, 
'Simple Path'. Simple Path does not query provenance events this way. It 
analyzes each individual event, so should be more lightweight.


---


[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157221674
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification 
messages within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link 
#commitMessages()} should be used serially from a single thread.
+ */
+public class NiFIAtlasHook extends AtlasHook implements LineageContext {
+
+public static final String NIFI_USER = "nifi";
--- End diff --

Yes, it would. However, at this moment, this user is not significant. 
Please see this conversation on the same subject.
https://github.com/apache/nifi/pull/2335#discussion_r156992235


---


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292654#comment-16292654
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157221249
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification 
messages within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link 
#commitMessages()} should be used serially from a single thread.
+ */
+public class NiFIAtlasHook extends AtlasHook implements LineageContext {
--- End diff --

Good catch, thank you! I'll fix this.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292661#comment-16292661
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157222944
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}
+
+

[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157222944
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}
+
+public static NiFiAtlasClient getInstance() {
+if (nifiClient == null) {
+synchronized (NiFiAtlasClient.class) {
+if (nifiClient == null) {
+nifiClient = new 

[GitHub] nifi pull request #2343: NIFI-2169: Cache compiled regexp for RouteText

2017-12-15 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2343#discussion_r157232403
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
 ---
@@ -209,6 +215,30 @@
 private volatile Map propertyMap = new 
HashMap<>();
 private volatile Pattern groupingRegex = null;
 
+@VisibleForTesting
+final static int PATTERNS_CACHE_MAXIMUM_ENTRIES = 10;
+
+/**
+ * LRU cache for the compiled patterns. The size of the cache is 
determined by the value of
+ * {@link #PATTERNS_CACHE_MAXIMUM_ENTRIES}.
+ */
+@VisibleForTesting
+final ConcurrentMap, Pattern> patternsCache = 
CacheBuilder.newBuilder()
+.maximumSize(PATTERNS_CACHE_MAXIMUM_ENTRIES)
+., Pattern>build()
+.asMap();
+
+private final Function, Pattern> compileRegex = 
ignoreCaseAndRegex -> {
--- End diff --

Again, I would avoid the use of the Pair here... and 
really would probably avoid the Function all together. Since it seems to be 
referenced only once, I'd prefer to instead just inline the use in the 
cacheCompiledPattern method, so that there we could just call something like:

`return patternsCache.computeIfAbsent(key, toCompile -> ignoreCase ? 
Pattern.compile(toCompile, Pattern.CASE_INSENSITIVE) : 
Pattern.compile(toCompile));`


---


[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157237190
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final ProvenanceEventRecord previousEvent = 
findPreviousProvenanceEvent(analysisContext, event);
+if (previousEvent == null) {
+logger.warn("Previous event was not found: {}", new 
Object[]{event});
+

[GitHub] nifi-minifi-cpp issue #217: MINIFICPP-41: First iteration of C api

2017-12-15 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/217
  
please merge these two commit


---


[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157240784
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final ProvenanceEventRecord previousEvent = 
findPreviousProvenanceEvent(analysisContext, event);
+if (previousEvent == null) {
+logger.warn("Previous event was not found: {}", new 
Object[]{event});
+

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292649#comment-16292649
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157220655
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
 ---
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.util.StringUtils;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.atlas.AtlasUtils.findIdByQualifiedName;
+import static 
org.apache.nifi.atlas.AtlasUtils.getComponentIdFromQualifiedName;
+import static org.apache.nifi.atlas.AtlasUtils.toStr;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.AS_IS;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.CREATED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.DELETED;
+import static org.apache.nifi.atlas.NiFiFlow.EntityChangeType.UPDATED;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_DESCRIPTION;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_FLOW_PATHS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUT_PORTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUEUES;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
+import static org.apache.nifi.atlas.NiFiTypes.ENTITIES;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_INPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_OUTPUT_PORT;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class NiFiAtlasClient {
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFiAtlasClient.class);
+
+private static NiFiAtlasClient nifiClient;
+private AtlasClientV2 atlasClient;
+
+private NiFiAtlasClient() {
+super();
+}
+
+

[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157221249
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification 
messages within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link 
#commitMessages()} should be used serially from a single thread.
+ */
+public class NiFIAtlasHook extends AtlasHook implements LineageContext {
--- End diff --

Good catch, thank you! I'll fix this.


---


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292652#comment-16292652
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157221167
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/AtlasUtils.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import org.apache.atlas.model.instance.AtlasObjectId;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+
+public class AtlasUtils {
+
+public static String toStr(Object obj) {
+return obj != null ? obj.toString() : null;
+}
+
+
+public static boolean isGuidAssigned(String guid) {
+return guid != null && !guid.startsWith("-");
--- End diff --

Yes, it's Atlas internal implementation. However, checking null is not 
sufficient, because entities created at client side but not yet registered in 
Atlas have negative GUIDs. I'd like to keep it this way if there's no strong 
objections. Thank you.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain dataset.
> But if we put every NiFi processor as independent process in Atlas, it would 
> be too granular, too. Also, we already have detailed event level lineage in 
> NiFi, we wouldn't need the same level in Atlas.
> If we can group certain processors in a NiFI flow as a process in Atlas, it 
> would be a nice granularity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157222062
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification 
messages within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link 
#commitMessages()} should be used serially from a single thread.
+ */
+public class NiFIAtlasHook extends AtlasHook implements LineageContext {
+
+public static final String NIFI_USER = "nifi";
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFIAtlasHook.class);
+private static final String CONF_PREFIX = "atlas.hook.nifi.";
+private static final String HOOK_NUM_RETRIES = CONF_PREFIX + 
"numRetries";
+
+private final NiFiAtlasClient atlasClient;
+
+/**
+ * An index to resolve a qualifiedName from a GUID.
+ */
+private final Map guidToQualifiedName;
+/**
+ * An index to resolve a Referenceable from a typeName::qualifiedName.
+ */
+private final Map typedQualifiedNameToRef;
+
+
+private static  Map createCache(final int maxSize) {
+return new LinkedHashMap(maxSize, 0.75f, true) {
--- End diff --

Well, it's just a private method to create cache instances within this 
class. So if different default optimizations are needed, then we can do that 
here as well without affecting others.


---


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292657#comment-16292657
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157221674
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification 
messages within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link 
#commitMessages()} should be used serially from a single thread.
+ */
+public class NiFIAtlasHook extends AtlasHook implements LineageContext {
+
+public static final String NIFI_USER = "nifi";
--- End diff --

Yes, it would. However, at this moment, this user is not significant. 
Please see this conversation on the same subject.
https://github.com/apache/nifi/pull/2335#discussion_r156992235


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage between dataset and process 
> those interacting with such data. 
> It would be beneficial for users who use both NiFi and Atlas and if they can 
> see end-to-end data lineage on Atlas lineage graph, as some type of dataset 
> are processed by both NiFi and technologies around Atlas such as Storm, 
> Falcon or Sqoop. For example, Kafka topics and Hive tables.
> In order to make this integration happen, I propose a NiFi reporting task 
> that analyzes NiFi flow then creates DataSet and Process entities in Atlas.
> The challenge is how to design NiFi flow dataset level lineage within Atlas 
> lineage graph.
> If we just add a single NiFi process and connect every DataSet from/to it, it 
> would be too ambiguous since it won't be clear which part of a NiFi flow 
> actually interact with certain 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292663#comment-16292663
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157223751
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final 

[jira] [Commented] (NIFI-4629) Non-existing attribute in ControlRate configuration causes NullPointerException

2017-12-15 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292684#comment-16292684
 ] 

ASF subversion and git services commented on NIFI-4629:
---

Commit 463dcd88129a4b99a08072ece787b2146b5fb790 in nifi's branch 
refs/heads/master from [~mgaido]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=463dcd8 ]

NIFI-4629: This closes #2345. Put flowfiles without the grouping attribute in 
the default group

Signed-off-by: joewitt 


> Non-existing attribute in ControlRate configuration causes 
> NullPointerException
> ---
>
> Key: NIFI-4629
> URL: https://issues.apache.org/jira/browse/NIFI-4629
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Configuration, Core UI
>Affects Versions: 1.4.0
> Environment: Ubuntu LTS 16.04
> openjdk-8-jre
>Reporter: Fredrik Sko
>Priority: Critical
>  Labels: ControlRate, NullPointerException, Processor
> Attachments: ControlRate-NP.xml
>
>
> When using the ControlRate processor, defining the "Grouping Attribute" with 
> a missing/non-existing attribute name produces NullPointerException errors.
> Processor configuration:
> {quote}
> Rate Control Criteria: flowfile count
> Maximum rate: 10
> Rate Controlled Attributes: (No value set)
> Time Duration: 1 min
> Grouping Attribute: foobar
> {quote}
> ControlRate with the following configuration when sent a flowfile without the 
> attribute {{foobar}} generates the following error:
> bq. ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] 
> ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] failed to process due 
> to java.lang.NullPointerException; rolling back session: null
> Additionally, the incoming flowfiles now ends up in some "dead" state where 
> I'm unable to even empty the queue.
> A simple template for reproduction is attached.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2345: NIFI-4629: Put flowfiles without the grouping attri...

2017-12-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2345


---


[jira] [Commented] (NIFI-2169) Improve RouteText performance with pre-compilation of RegEx in certain cases

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292703#comment-16292703
 ] 

ASF GitHub Bot commented on NIFI-2169:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2343#discussion_r157230846
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
 ---
@@ -209,6 +215,30 @@
 private volatile Map propertyMap = new 
HashMap<>();
 private volatile Pattern groupingRegex = null;
 
+@VisibleForTesting
+final static int PATTERNS_CACHE_MAXIMUM_ENTRIES = 10;
--- End diff --

We could probably cache more than 10 here. I think the idea on the PR was 
simply to convey that we need a reasonable upward bound, rather than allowing 
it to grow indefinitely. I would tend to lean more toward say 100 personally? 
Or even 1024 or so. A compiled Pattern is fairly small I believe in terms of 
heap utilization, so I wouldn't be concerned personally with such a limit.


> Improve RouteText performance with pre-compilation of RegEx in certain cases
> 
>
> Key: NIFI-2169
> URL: https://issues.apache.org/jira/browse/NIFI-2169
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core Framework
>Affects Versions: 0.6.1
>Reporter: Stephane Maarek
>Assignee: Oleg Zhurakousky
>  Labels: beginner, easy
>
> When using RegEx matches for the RouteText processor (and possibly other 
> processors), the RegEx gets recompiled every time the processor works. The 
> RegEx could be precompiled / cached under certain conditions, in order to 
> improve the performance of the processor
> See email from Mark Payne:
> Re #2: The regular expression is compiled every time. This is done, though, 
> because the Regex allows the Expression
> Language to be used, so the Regex could actually be different for each 
> FlowFile. That being said, it could certainly be
> improved by either (a) pre-compiling in the case that no Expression Language 
> is used and/or (b) cache up to say 10
> Regex'es once they are compiled. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-2169) Improve RouteText performance with pre-compilation of RegEx in certain cases

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292701#comment-16292701
 ] 

ASF GitHub Bot commented on NIFI-2169:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2343#discussion_r157231329
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
 ---
@@ -209,6 +215,30 @@
 private volatile Map propertyMap = new 
HashMap<>();
 private volatile Pattern groupingRegex = null;
 
+@VisibleForTesting
+final static int PATTERNS_CACHE_MAXIMUM_ENTRIES = 10;
+
+/**
+ * LRU cache for the compiled patterns. The size of the cache is 
determined by the value of
+ * {@link #PATTERNS_CACHE_MAXIMUM_ENTRIES}.
+ */
+@VisibleForTesting
+final ConcurrentMap, Pattern> patternsCache = 
CacheBuilder.newBuilder()
--- End diff --

I think we can simplify this some. Because the notion of 'ignore case' is 
going to be true for all regexes or false for all regexes, I think we can get 
rid of the Pair and just use String as the key. Then we'd just 
need to ensure that we clear the cache in the @OnScheduled method or in the 
onPropertyModified if that property is changed.


> Improve RouteText performance with pre-compilation of RegEx in certain cases
> 
>
> Key: NIFI-2169
> URL: https://issues.apache.org/jira/browse/NIFI-2169
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core Framework
>Affects Versions: 0.6.1
>Reporter: Stephane Maarek
>Assignee: Oleg Zhurakousky
>  Labels: beginner, easy
>
> When using RegEx matches for the RouteText processor (and possibly other 
> processors), the RegEx gets recompiled every time the processor works. The 
> RegEx could be precompiled / cached under certain conditions, in order to 
> improve the performance of the processor
> See email from Mark Payne:
> Re #2: The regular expression is compiled every time. This is done, though, 
> because the Regex allows the Expression
> Language to be used, so the Regex could actually be different for each 
> FlowFile. That being said, it could certainly be
> improved by either (a) pre-compiling in the case that no Expression Language 
> is used and/or (b) cache up to say 10
> Regex'es once they are compiled. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-2169) Improve RouteText performance with pre-compilation of RegEx in certain cases

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292702#comment-16292702
 ] 

ASF GitHub Bot commented on NIFI-2169:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2343#discussion_r157232403
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
 ---
@@ -209,6 +215,30 @@
 private volatile Map propertyMap = new 
HashMap<>();
 private volatile Pattern groupingRegex = null;
 
+@VisibleForTesting
+final static int PATTERNS_CACHE_MAXIMUM_ENTRIES = 10;
+
+/**
+ * LRU cache for the compiled patterns. The size of the cache is 
determined by the value of
+ * {@link #PATTERNS_CACHE_MAXIMUM_ENTRIES}.
+ */
+@VisibleForTesting
+final ConcurrentMap, Pattern> patternsCache = 
CacheBuilder.newBuilder()
+.maximumSize(PATTERNS_CACHE_MAXIMUM_ENTRIES)
+., Pattern>build()
+.asMap();
+
+private final Function, Pattern> compileRegex = 
ignoreCaseAndRegex -> {
--- End diff --

Again, I would avoid the use of the Pair here... and 
really would probably avoid the Function all together. Since it seems to be 
referenced only once, I'd prefer to instead just inline the use in the 
cacheCompiledPattern method, so that there we could just call something like:

`return patternsCache.computeIfAbsent(key, toCompile -> ignoreCase ? 
Pattern.compile(toCompile, Pattern.CASE_INSENSITIVE) : 
Pattern.compile(toCompile));`


> Improve RouteText performance with pre-compilation of RegEx in certain cases
> 
>
> Key: NIFI-2169
> URL: https://issues.apache.org/jira/browse/NIFI-2169
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core Framework
>Affects Versions: 0.6.1
>Reporter: Stephane Maarek
>Assignee: Oleg Zhurakousky
>  Labels: beginner, easy
>
> When using RegEx matches for the RouteText processor (and possibly other 
> processors), the RegEx gets recompiled every time the processor works. The 
> RegEx could be precompiled / cached under certain conditions, in order to 
> improve the performance of the processor
> See email from Mark Payne:
> Re #2: The regular expression is compiled every time. This is done, though, 
> because the Regex allows the Expression
> Language to be used, so the Regex could actually be different for each 
> FlowFile. That being said, it could certainly be
> improved by either (a) pre-compiling in the case that no Expression Language 
> is used and/or (b) cache up to say 10
> Regex'es once they are compiled. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2343: NIFI-2169: Cache compiled regexp for RouteText

2017-12-15 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2343#discussion_r157230846
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
 ---
@@ -209,6 +215,30 @@
 private volatile Map propertyMap = new 
HashMap<>();
 private volatile Pattern groupingRegex = null;
 
+@VisibleForTesting
+final static int PATTERNS_CACHE_MAXIMUM_ENTRIES = 10;
--- End diff --

We could probably cache more than 10 here. I think the idea on the PR was 
simply to convey that we need a reasonable upward bound, rather than allowing 
it to grow indefinitely. I would tend to lean more toward say 100 personally? 
Or even 1024 or so. A compiled Pattern is fairly small I believe in terms of 
heap utilization, so I wouldn't be concerned personally with such a limit.


---


[GitHub] nifi pull request #2343: NIFI-2169: Cache compiled regexp for RouteText

2017-12-15 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2343#discussion_r157231329
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
 ---
@@ -209,6 +215,30 @@
 private volatile Map propertyMap = new 
HashMap<>();
 private volatile Pattern groupingRegex = null;
 
+@VisibleForTesting
+final static int PATTERNS_CACHE_MAXIMUM_ENTRIES = 10;
+
+/**
+ * LRU cache for the compiled patterns. The size of the cache is 
determined by the value of
+ * {@link #PATTERNS_CACHE_MAXIMUM_ENTRIES}.
+ */
+@VisibleForTesting
+final ConcurrentMap, Pattern> patternsCache = 
CacheBuilder.newBuilder()
--- End diff --

I think we can simplify this some. Because the notion of 'ignore case' is 
going to be true for all regexes or false for all regexes, I think we can get 
rid of the Pair and just use String as the key. Then we'd just 
need to ensure that we clear the cache in the @OnScheduled method or in the 
onPropertyModified if that property is changed.


---


[jira] [Commented] (NIFI-2169) Improve RouteText performance with pre-compilation of RegEx in certain cases

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292711#comment-16292711
 ] 

ASF GitHub Bot commented on NIFI-2169:
--

Github user markap14 commented on the issue:

https://github.com/apache/nifi/pull/2343
  
@mgaido91 sorry I didn't see Joe's comments until I made that last one :)


> Improve RouteText performance with pre-compilation of RegEx in certain cases
> 
>
> Key: NIFI-2169
> URL: https://issues.apache.org/jira/browse/NIFI-2169
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core Framework
>Affects Versions: 0.6.1
>Reporter: Stephane Maarek
>Assignee: Oleg Zhurakousky
>  Labels: beginner, easy
>
> When using RegEx matches for the RouteText processor (and possibly other 
> processors), the RegEx gets recompiled every time the processor works. The 
> RegEx could be precompiled / cached under certain conditions, in order to 
> improve the performance of the processor
> See email from Mark Payne:
> Re #2: The regular expression is compiled every time. This is done, though, 
> because the Regex allows the Expression
> Language to be used, so the Regex could actually be different for each 
> FlowFile. That being said, it could certainly be
> improved by either (a) pre-compiling in the case that no Expression Language 
> is used and/or (b) cache up to say 10
> Regex'es once they are compiled. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi issue #2343: NIFI-2169: Cache compiled regexp for RouteText

2017-12-15 Thread markap14
Github user markap14 commented on the issue:

https://github.com/apache/nifi/pull/2343
  
@mgaido91 sorry I didn't see Joe's comments until I made that last one :)


---


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292712#comment-16292712
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157234799
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/resources/docs/org.apache.nifi.atlas.reporting.AtlasNiFiFlowLineage/additionalDetails.html
 ---
@@ -0,0 +1,541 @@
+
+
+
+
+
+AtlasNiFiFlowLineage
+
+
+
+
+AtlasNiFiFlowLineage
+
+Table of contents:
+
+
+Information reported to 
Atlas
+NiFi Atlas Types
+Cluster Name Resolution
+NiFi flow structure
+
+Path Separation 
Logic
+
+
+NiFi data lineage
+
+NiFi Lineage 
Strategy
+NiFi Provenance Event 
Analysis
+Supported 
DataSets and Processors
+
+
+How it runs in NiFi 
cluster
+Limitations
+Atlas Server 
Configurations
+Atlas Server Emulator
+
+
+Information reported to Atlas
+This reporting task stores two types of NiFi flow information, 
'NiFi flow structure' and 'NiFi data lineage'.
+
+'NiFi flow structure' tells what components are running within 
a NiFi flow and how these are connected. It is reported by analyzing current 
NiFi flow structure, specifically NiFi component relationships.
+
+'NiFi data lineage' tells what part of NiFi flow interacts with 
different DataSets such as HDFS files or Hive tables ... etc. It is reported by 
analyzing NiFi provenance events.
+
+
+
+Technically each information is sent using different protocol, 
Atlas REST API v2, and Notification via a Kafka topic as shown in above 
image.
+
+
+As both information types use the same NiFi Atlas Types and Cluster Name Resolution concepts, it is recommended to 
start reading those sections first.
+
+NiFi Atlas Types
+
+This reporting task creates following NiFi specific types in 
Atlas Type system when it runs if these type definitions are not found.
+
+Green boxes represent sub-types of DataSet and blue ones are 
sub-types of Process. Gray lines represent entity ownership.
+Red lines represent lineage.
+
+
+
+
+nifi_flow
+Represents a NiFI data flow.
+As shown in the above diagram, nifi_flow owns other 
nifi_component types.
+This owning relationship is defined by Atlas 'owned' 
constraint so that when a 'nifi_flow' entity is removed, all owned NiFi 
component entities are removed in cascading manner.
+When this reporting task runs, it analyzes and traverse 
the entire flow structure, and create NiFi component entities in Atlas.
+At later runs, it compares the current flow structure 
with the one stored in Atlas to figure out if any changes has been made since 
the last time the flow was reported. The reporting task updates NiFi component 
entities in Atlas if needed.
+NiFi components those are removed from a NiFi flow also 
get deleted from Atlas.
+However those entities can still be seen in Atlas 
search results or lineage graphs since Atlas uses 'Soft Delete' by default.
+See Atlas Delete Handler 
for further detail.
+
+Attributes:
+
+qualifiedName: Root ProcessGroup ID@clusterName (e.g. 
86420a14-2fab-3e1e-4331-fb6ab42f58e0@cl1)
+name: Name of the Root ProcessGroup.
+url: URL of the NiFi instance. This can be specified 
via reporting task 'NiFi URL for Atlas' property.
+
+
+
+nifi_flow_path Part of a NiFi data flow containing one 
or more processing NiFi components such as Processors and RemoteGroupPorts. The 
reporting task divides a NiFi flow into multiple flow paths. See Path Separation Logic for details.
+Attributes:
+
+qualifiedName: The first NiFi component Id in a 
path@clusterName (e.g. 529e6722-9b49-3b66-9c94-00da9863ca2d@cl1)
+name: NiFi component namess within a path are 
concatenated (e.g. GenerateFlowFile, PutFile, LogAttribute)
+url: A deep 

[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292724#comment-16292724
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157236574
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
+return result;
+}
+logger.warn("Lineage query for {} timed out.", new 
Object[]{eventId});
+} catch (InterruptedException e) {
+logger.warn("Lineage query for {} was interrupted due to {}.", 
new Object[]{eventId, e}, e);
+} finally {
+submission.cancel();
+}
+
+return null;
+}
+
+@Override
+public ComputeLineageResult queryLineage(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitLineageComputation(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+public ComputeLineageResult findParents(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitExpandParents(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+// NOTE: This user is required to avoid NullPointerException at 
PersistentProvenanceRepository.submitLineageComputation
+private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser();
--- End diff --

@ijokarumawak 

[jira] [Commented] (MINIFICPP-330) Implement substring operations in Expression Language

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292725#comment-16292725
 ] 

ASF GitHub Bot commented on MINIFICPP-330:
--

Github user achristianson commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/219
  
@phrocker tests added.


> Implement substring operations in Expression Language
> -
>
> Key: MINIFICPP-330
> URL: https://issues.apache.org/jira/browse/MINIFICPP-330
> Project: NiFi MiNiFi C++
>  Issue Type: Improvement
>Reporter: Andrew Christianson
>Assignee: Andrew Christianson
>
> Add support for these substring functions to EL:
> substring
> substringBefore
> substringBeforeLast
> substringAfter
> substringAfterLast



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread markap14
Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157236574
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/StandardAnalysisContext.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance;
+
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class StandardAnalysisContext implements AnalysisContext {
+
+private final Logger logger = 
LoggerFactory.getLogger(StandardAnalysisContext.class);
+private final NiFiFlow nifiFlow;
+private final ClusterResolver clusterResolver;
+private final ProvenanceRepository provenanceRepository;
+
+public StandardAnalysisContext(NiFiFlow nifiFlow, ClusterResolver 
clusterResolver,
+   ProvenanceRepository 
provenanceRepository) {
+this.nifiFlow = nifiFlow;
+this.clusterResolver = clusterResolver;
+this.provenanceRepository = provenanceRepository;
+}
+
+@Override
+public List findConnectionTo(String componentId) {
+return nifiFlow.getIncomingRelationShips(componentId);
+}
+
+@Override
+public List findConnectionFrom(String componentId) {
+return nifiFlow.getOutgoingRelationShips(componentId);
+}
+
+@Override
+public String getNiFiClusterName() {
+return nifiFlow.getClusterName();
+}
+
+@Override
+public ClusterResolver getClusterResolver() {
+return clusterResolver;
+}
+
+private ComputeLineageResult getLineageResult(long eventId, 
ComputeLineageSubmission submission) {
+final ComputeLineageResult result = submission.getResult();
+try {
+if (result.awaitCompletion(10, TimeUnit.SECONDS)) {
+return result;
+}
+logger.warn("Lineage query for {} timed out.", new 
Object[]{eventId});
+} catch (InterruptedException e) {
+logger.warn("Lineage query for {} was interrupted due to {}.", 
new Object[]{eventId, e}, e);
+} finally {
+submission.cancel();
+}
+
+return null;
+}
+
+@Override
+public ComputeLineageResult queryLineage(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitLineageComputation(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+public ComputeLineageResult findParents(long eventId) {
+final ComputeLineageSubmission submission = 
provenanceRepository.submitExpandParents(eventId, NIFI_USER);
+return getLineageResult(eventId, submission);
+}
+
+// NOTE: This user is required to avoid NullPointerException at 
PersistentProvenanceRepository.submitLineageComputation
+private static final QueryNiFiUser NIFI_USER = new QueryNiFiUser();
--- End diff --

@ijokarumawak The authentication is done at the web request layer and 
authorization for making a provenance query is done there, but the event-level 
authorization is not. That is done in the repository itself. So in a secure 
environment, the lineage 

[GitHub] nifi-minifi-cpp issue #219: MINIFICPP-330 Implemented Expression Language su...

2017-12-15 Thread achristianson
Github user achristianson commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/219
  
@phrocker tests added.


---


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292658#comment-16292658
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157222062
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFIAtlasHook.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.nifi.atlas.provenance.lineage.LineageContext;
+import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+
+/**
+ * This class is not thread-safe as it holds uncommitted notification 
messages within instance.
+ * {@link #addMessage(HookNotificationMessage)} and {@link 
#commitMessages()} should be used serially from a single thread.
+ */
+public class NiFIAtlasHook extends AtlasHook implements LineageContext {
+
+public static final String NIFI_USER = "nifi";
+
+private static final Logger logger = 
LoggerFactory.getLogger(NiFIAtlasHook.class);
+private static final String CONF_PREFIX = "atlas.hook.nifi.";
+private static final String HOOK_NUM_RETRIES = CONF_PREFIX + 
"numRetries";
+
+private final NiFiAtlasClient atlasClient;
+
+/**
+ * An index to resolve a qualifiedName from a GUID.
+ */
+private final Map guidToQualifiedName;
+/**
+ * An index to resolve a Referenceable from a typeName::qualifiedName.
+ */
+private final Map typedQualifiedNameToRef;
+
+
+private static  Map createCache(final int maxSize) {
+return new LinkedHashMap(maxSize, 0.75f, true) {
--- End diff --

Well, it's just a private method to create cache instances within this 
class. So if different default optimizations are needed, then we can do that 
here as well without affecting others.


> Export NiFi flow dataset lineage to Apache Atlas
> 
>
> Key: NIFI-3709
> URL: https://issues.apache.org/jira/browse/NIFI-3709
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>
> While Apache NiFi has provenance and event level lineage support within its 
> data flow, Apache Atlas also does manage lineage 

[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157223751
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final ProvenanceEventRecord previousEvent = 
findPreviousProvenanceEvent(analysisContext, event);
+if (previousEvent == null) {
+logger.warn("Previous event was not found: {}", new 
Object[]{event});
+

[jira] [Commented] (MINIFICPP-330) Implement substring operations in Expression Language

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292667#comment-16292667
 ] 

ASF GitHub Bot commented on MINIFICPP-330:
--

Github user achristianson commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/219
  
@phrocker yes, if they call substringBefore on an attr with no other 
arguments, then they should get an "attempted to call incomplete function." 
I'll add a test case for that.


> Implement substring operations in Expression Language
> -
>
> Key: MINIFICPP-330
> URL: https://issues.apache.org/jira/browse/MINIFICPP-330
> Project: NiFi MiNiFi C++
>  Issue Type: Improvement
>Reporter: Andrew Christianson
>Assignee: Andrew Christianson
>
> Add support for these substring functions to EL:
> substring
> substringBefore
> substringBeforeLast
> substringAfter
> substringAfterLast



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-2169) Improve RouteText performance with pre-compilation of RegEx in certain cases

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292668#comment-16292668
 ] 

ASF GitHub Bot commented on NIFI-2169:
--

Github user joewitt commented on the issue:

https://github.com/apache/nifi/pull/2343
  
@mgaido91 it failed due to a checkstyle issue.  You'll want to look at the 
raw logs and scroll to the bottom.  The key issue is

[WARNING] 
src/test/java/org/apache/nifi/processors/standard/TestRouteText.java:[34,8] 
(imports) UnusedImports: Unused import - 
org.apache.nifi.util.MockProcessSession.


> Improve RouteText performance with pre-compilation of RegEx in certain cases
> 
>
> Key: NIFI-2169
> URL: https://issues.apache.org/jira/browse/NIFI-2169
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core Framework
>Affects Versions: 0.6.1
>Reporter: Stephane Maarek
>Assignee: Oleg Zhurakousky
>  Labels: beginner, easy
>
> When using RegEx matches for the RouteText processor (and possibly other 
> processors), the RegEx gets recompiled every time the processor works. The 
> RegEx could be precompiled / cached under certain conditions, in order to 
> improve the performance of the processor
> See email from Mark Payne:
> Re #2: The regular expression is compiled every time. This is done, though, 
> because the Regex allows the Expression
> Language to be used, so the Regex could actually be different for each 
> FlowFile. That being said, it could certainly be
> improved by either (a) pre-compiling in the case that no Expression Language 
> is used and/or (b) cache up to say 10
> Regex'es once they are compiled. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (NIFI-4629) Non-existing attribute in ControlRate configuration causes NullPointerException

2017-12-15 Thread Joseph Witt (JIRA)

 [ 
https://issues.apache.org/jira/browse/NIFI-4629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph Witt updated NIFI-4629:
--
Fix Version/s: 1.5.0

> Non-existing attribute in ControlRate configuration causes 
> NullPointerException
> ---
>
> Key: NIFI-4629
> URL: https://issues.apache.org/jira/browse/NIFI-4629
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Configuration, Core UI
>Affects Versions: 1.4.0
> Environment: Ubuntu LTS 16.04
> openjdk-8-jre
>Reporter: Fredrik Sko
>Priority: Critical
>  Labels: ControlRate, NullPointerException, Processor
> Fix For: 1.5.0
>
> Attachments: ControlRate-NP.xml
>
>
> When using the ControlRate processor, defining the "Grouping Attribute" with 
> a missing/non-existing attribute name produces NullPointerException errors.
> Processor configuration:
> {quote}
> Rate Control Criteria: flowfile count
> Maximum rate: 10
> Rate Controlled Attributes: (No value set)
> Time Duration: 1 min
> Grouping Attribute: foobar
> {quote}
> ControlRate with the following configuration when sent a flowfile without the 
> attribute {{foobar}} generates the following error:
> bq. ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] 
> ControlRate\[id=dff05b32-015f-1000-db55-5957a9298bab] failed to process due 
> to java.lang.NullPointerException; rolling back session: null
> Additionally, the incoming flowfiles now ends up in some "dead" state where 
> I'm unable to even empty the queue.
> A simple template for reproduction is attached.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (NIFI-2169) Improve RouteText performance with pre-compilation of RegEx in certain cases

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292710#comment-16292710
 ] 

ASF GitHub Bot commented on NIFI-2169:
--

Github user markap14 commented on the issue:

https://github.com/apache/nifi/pull/2343
  
@mgaido91 Travis-CI also reports the following checkstyle violation:
`[WARNING] 
src/test/java/org/apache/nifi/processors/standard/TestRouteText.java:[34,8] 
(imports) UnusedImports: Unused import - 
org.apache.nifi.util.MockProcessSession.
`



> Improve RouteText performance with pre-compilation of RegEx in certain cases
> 
>
> Key: NIFI-2169
> URL: https://issues.apache.org/jira/browse/NIFI-2169
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Core Framework
>Affects Versions: 0.6.1
>Reporter: Stephane Maarek
>Assignee: Oleg Zhurakousky
>  Labels: beginner, easy
>
> When using RegEx matches for the RouteText processor (and possibly other 
> processors), the RegEx gets recompiled every time the processor works. The 
> RegEx could be precompiled / cached under certain conditions, in order to 
> improve the performance of the processor
> See email from Mark Payne:
> Re #2: The regular expression is compiled every time. This is done, though, 
> because the Regex allows the Expression
> Language to be used, so the Regex could actually be different for each 
> FlowFile. That being said, it could certainly be
> improved by either (a) pre-compiling in the case that no Expression Language 
> is used and/or (b) cache up to say 10
> Regex'es once they are compiled. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi issue #2343: NIFI-2169: Cache compiled regexp for RouteText

2017-12-15 Thread markap14
Github user markap14 commented on the issue:

https://github.com/apache/nifi/pull/2343
  
@mgaido91 Travis-CI also reports the following checkstyle violation:
`[WARNING] 
src/test/java/org/apache/nifi/processors/standard/TestRouteText.java:[34,8] 
(imports) UnusedImports: Unused import - 
org.apache.nifi.util.MockProcessSession.
`



---


[jira] [Commented] (NIFI-3709) Export NiFi flow dataset lineage to Apache Atlas

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-3709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292728#comment-16292728
 ] 

ASF GitHub Bot commented on NIFI-3709:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157237190
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.provenance.lineage;
+
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowPath;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.DataSetRefs;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageResult;
+import org.apache.nifi.provenance.lineage.LineageEdge;
+import org.apache.nifi.provenance.lineage.LineageNode;
+import org.apache.nifi.provenance.lineage.LineageNodeType;
+
+import java.util.List;
+
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
+import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_QUEUE;
+
+public class SimpleFlowPathLineage extends AbstractLineageStrategy {
+
+@Override
+public void processEvent(AnalysisContext analysisContext, NiFiFlow 
nifiFlow, ProvenanceEventRecord event) {
+final DataSetRefs refs = executeAnalyzer(analysisContext, event);
+if (refs == null || (refs.isEmpty())) {
+return;
+}
+
+if ("Remote Input Port".equals(event.getComponentType()) || 
"Remote Output Port".equals(event.getComponentType())) {
+processRemotePortEvent(analysisContext, nifiFlow, event, refs);
+} else {
+addDataSetRefs(nifiFlow, refs);
+}
+
+}
+
+/**
+ * Create a flow_path entity corresponding to the target 
RemoteGroupPort when a SEND/RECEIVE event are received.
+ * Because such entity can not be created in advance while analyzing 
flow statically,
+ * as ReportingTask can not determine whether a component id is a 
RemoteGroupPort,
+ * since connectionStatus is the only available information in 
ReportingContext.
+ * ConnectionStatus only knows component id, component type is unknown.
+ * For example, there is no difference to tell if a connected 
component is a funnel or a RemoteGroupPort.
+ */
+private void processRemotePortEvent(AnalysisContext analysisContext, 
NiFiFlow nifiFlow, ProvenanceEventRecord event, DataSetRefs analyzedRefs) {
+
+final boolean isRemoteInputPort = "Remote Input 
Port".equals(event.getComponentType());
+
+// Create a RemoteInputPort Process.
+// event.getComponentId returns UUID for RemoteGroupPort as a 
client of S2S, and it's different from a remote port UUID (portDataSetid).
+// See NIFI-4571 for detail.
+final Referenceable remotePortDataSet = isRemoteInputPort ? 
analyzedRefs.getOutputs().iterator().next() :  
analyzedRefs.getInputs().iterator().next();
+final String portProcessId = event.getComponentId();
+
+final NiFiFlowPath remotePortProcess = new 
NiFiFlowPath(portProcessId);
+remotePortProcess.setName(event.getComponentType());
+remotePortProcess.addProcessor(portProcessId);
+
+// For RemoteInputPort, need to find the previous component 
connected to this port,
+// which passed this particular FlowFile.
+// That is only possible by calling lineage API.
+if (isRemoteInputPort) {
+final ProvenanceEventRecord 

[GitHub] nifi-minifi-cpp issue #216: MINIFICPP-341 Add CentOS6 build instructions

2017-12-15 Thread minifirocks
Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/216
  
please resolve conflict


---


[jira] [Commented] (MINIFICPP-341) Add CentOS6 build instructions

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292743#comment-16292743
 ] 

ASF GitHub Bot commented on MINIFICPP-341:
--

Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/216
  
please resolve conflict


> Add CentOS6 build instructions
> --
>
> Key: MINIFICPP-341
> URL: https://issues.apache.org/jira/browse/MINIFICPP-341
> Project: NiFi MiNiFi C++
>  Issue Type: Improvement
>Reporter: Andrew Christianson
>Assignee: Andrew Christianson
>
> Add docs for building on CentOS6



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (MINIFICPP-41) Create C++ library for core API

2017-12-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFICPP-41?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292741#comment-16292741
 ] 

ASF GitHub Bot commented on MINIFICPP-41:
-

Github user minifirocks commented on the issue:

https://github.com/apache/nifi-minifi-cpp/pull/217
  
please merge these two commit


> Create C++ library for core API
> ---
>
> Key: MINIFICPP-41
> URL: https://issues.apache.org/jira/browse/MINIFICPP-41
> Project: NiFi MiNiFi C++
>  Issue Type: Task
>Reporter: Aldrin Piri
>
> To make it possible for folks to create extensions and build on top of the 
> MiNiFi C++ framework, the establishment of a core API library extracted from 
> the existing codebase that can be linked should be provided.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] nifi pull request #2335: NIFI-3709: Export NiFi flow dataset lineage to Apac...

2017-12-15 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2335#discussion_r157217462
  
--- Diff: 
nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/AtlasNiFiFlowLineage.java
 ---
@@ -0,0 +1,714 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.atlas.reporting;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.atlas.NiFIAtlasHook;
+import org.apache.nifi.atlas.NiFiAtlasClient;
+import org.apache.nifi.atlas.NiFiFlow;
+import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.provenance.AnalysisContext;
+import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
+import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
+import org.apache.nifi.atlas.provenance.lineage.LineageStrategy;
+import org.apache.nifi.atlas.provenance.lineage.SimpleFlowPathLineage;
+import org.apache.nifi.atlas.resolver.ClusterResolver;
+import org.apache.nifi.atlas.resolver.ClusterResolvers;
+import org.apache.nifi.atlas.resolver.RegexClusterResolver;
+import org.apache.nifi.atlas.security.AtlasAuthN;
+import org.apache.nifi.atlas.security.Basic;
+import org.apache.nifi.atlas.security.Kerberos;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE;
+import static 
org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
+
  

  1   2   >