[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-11-29 Thread ASF subversion and git services (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702923#comment-16702923
 ] 

ASF subversion and git services commented on NIFI-5406:
---

Commit 30f2f4205121113c26bb00ac5a8697dffaeb8206 in nifi's branch 
refs/heads/master from [~ijokarumawak]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=30f2f42 ]

NIFI-5849: ListXXX can lose cluster state on processor restart

NIFI-5406 introduced the issue by trying to use the resetState variable for
different purposes. AbstractListProcessor should have had a different variable
to control whether to clear state for tracking entity strategy.

Signed-off-by: Pierre Villard 

This closes #3189.


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
> Fix For: 1.8.0
>
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in the last 30 mins.
>  ** 'Minimum File Age' to defer listing entities potentially being written
>  * Any entity added but not listed ever having last-modified-timestamp older 
> than configured 'Watch Time Window' will not be listed. If user needs to pick 
> these items, they have to make 'Watch Time Window' longer. It also increases 
> the size of data the processor has to persist in the K/V store. Efficiency vs 
> reliability trade-off.
>  * The already-listed entities are persisted into one of supported K/V store 
> through DistributedMapCacheClient service. User can chose what KVS to use 
> from HBase, Redis, Couchbase and File (DistributedMapCacheServer with 
> persistence file).
>  * The reason to use KVS instead of ManagedState is, to avoid hammering 
> Zookeeper too much with frequently updating Zk node with large amount of 
> data. The number of already-listed entries can be huge depending on 
> use-cases. Also, we can compress entities with DistributedMapCacheClient as 
> it supports putting byte array, while ManagedState only supports Map String>.
>  * On each onTrigger:
>  ** Processor performs listing. Listed entries meeting any of the following 
> condition will be written to the 'success' output FlowFile:
>  *** Not exists in the already-listed entities
>  *** Having newer last-modified-timestamp
>  *** Having different size
>  ** Already listed entries those are old enough compared to 'Watch Time 
> Window' are discarded from the already-listed entries.
>  * Initial supporting target is Local file system, FTP and SFTP



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546954#comment-16546954
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2876


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
> Fix For: 1.8.0
>
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in the last 30 mins.
>  ** 'Minimum File Age' to defer listing entities potentially being written
>  * Any entity added but not listed ever having last-modified-timestamp older 
> than configured 'Watch Time Window' will not be listed. If user needs to pick 
> these items, they have to make 'Watch Time Window' longer. It also increases 
> the size of data the processor has to persist in the K/V store. Efficiency vs 
> reliability trade-off.
>  * The already-listed entities are persisted into one of supported K/V store 
> through DistributedMapCacheClient service. User can chose what KVS to use 
> from HBase, Redis, Couchbase and File (DistributedMapCacheServer with 
> persistence file).
>  * The reason to use KVS instead of ManagedState is, to avoid hammering 
> Zookeeper too much with frequently updating Zk node with large amount of 
> data. The number of already-listed entries can be huge depending on 
> use-cases. Also, we can compress entities with DistributedMapCacheClient as 
> it supports putting byte array, while ManagedState only supports Map String>.
>  * On each onTrigger:
>  ** Processor performs listing. Listed entries meeting any of the following 
> condition will be written to the 'success' output FlowFile:
>  *** Not exists in the already-listed entities
>  *** Having newer last-modified-timestamp
>  *** Having different size
>  ** Already listed entries those are old enough compared to 'Watch Time 
> Window' are discarded from the already-listed entries.
>  * Initial supporting target is Local file system, FTP and SFTP



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546953#comment-16546953
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user markap14 commented on the issue:

https://github.com/apache/nifi/pull/2876
  
All of my testing looks good as well. +1 merged to master. Thanks for the 
improvement, @ijokarumawak !


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in the last 30 mins.
>  ** 'Minimum File Age' to defer listing entities potentially being written
>  * Any entity added but not listed ever having last-modified-timestamp older 
> than configured 'Watch Time Window' will not be listed. If user needs to pick 
> these items, they have to make 'Watch Time Window' longer. It also increases 
> the size of data the processor has to persist in the K/V store. Efficiency vs 
> reliability trade-off.
>  * The already-listed entities are persisted into one of supported K/V store 
> through DistributedMapCacheClient service. User can chose what KVS to use 
> from HBase, Redis, Couchbase and File (DistributedMapCacheServer with 
> persistence file).
>  * The reason to use KVS instead of ManagedState is, to avoid hammering 
> Zookeeper too much with frequently updating Zk node with large amount of 
> data. The number of already-listed entries can be huge depending on 
> use-cases. Also, we can compress entities with DistributedMapCacheClient as 
> it supports putting byte array, while ManagedState only supports Map String>.
>  * On each onTrigger:
>  ** Processor performs listing. Listed entries meeting any of the following 
> condition will be written to the 'success' output FlowFile:
>  *** Not exists in the already-listed entities
>  *** Having newer last-modified-timestamp
>  *** Having different size
>  ** Already listed entries those are old enough compared to 'Watch Time 
> Window' are discarded from the already-listed entries.
>  * Initial supporting target is Local file system, FTP and SFTP



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546911#comment-16546911
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user markap14 commented on the issue:

https://github.com/apache/nifi/pull/2876
  
@ijokarumawak thanks for the updates! Code looks good! I will run a few 
tests and if all looks good will merge to master. Thanks!


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in the last 30 mins.
>  ** 'Minimum File Age' to defer listing entities potentially being written
>  * Any entity added but not listed ever having last-modified-timestamp older 
> than configured 'Watch Time Window' will not be listed. If user needs to pick 
> these items, they have to make 'Watch Time Window' longer. It also increases 
> the size of data the processor has to persist in the K/V store. Efficiency vs 
> reliability trade-off.
>  * The already-listed entities are persisted into one of supported K/V store 
> through DistributedMapCacheClient service. User can chose what KVS to use 
> from HBase, Redis, Couchbase and File (DistributedMapCacheServer with 
> persistence file).
>  * The reason to use KVS instead of ManagedState is, to avoid hammering 
> Zookeeper too much with frequently updating Zk node with large amount of 
> data. The number of already-listed entries can be huge depending on 
> use-cases. Also, we can compress entities with DistributedMapCacheClient as 
> it supports putting byte array, while ManagedState only supports Map String>.
>  * On each onTrigger:
>  ** Processor performs listing. Listed entries meeting any of the following 
> condition will be written to the 'success' output FlowFile:
>  *** Not exists in the already-listed entities
>  *** Having newer last-modified-timestamp
>  *** Having different size
>  ** Already listed entries those are old enough compared to 'Watch Time 
> Window' are discarded from the already-listed entries.
>  * Initial supporting target is Local file system, FTP and SFTP



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546089#comment-16546089
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user ijokarumawak commented on the issue:

https://github.com/apache/nifi/pull/2876
  
@markap14 I believe all review comments are addressed now. Also, fixed few 
things found during tests under clustered environment. And changed default 
initial listing target from 'window' to 'all' to make it behave the same as 
existing listing does. I hope it's ready to be merged. Thanks for reviewing and 
insightful feedback!


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in the last 30 mins.
>  ** 'Minimum File Age' to defer listing entities potentially being written
>  * Any entity added but not listed ever having last-modified-timestamp older 
> than configured 'Watch Time Window' will not be listed. If user needs to pick 
> these items, they have to make 'Watch Time Window' longer. It also increases 
> the size of data the processor has to persist in the K/V store. Efficiency vs 
> reliability trade-off.
>  * The already-listed entities are persisted into one of supported K/V store 
> through DistributedMapCacheClient service. User can chose what KVS to use 
> from HBase, Redis, Couchbase and File (DistributedMapCacheServer with 
> persistence file).
>  * The reason to use KVS instead of ManagedState is, to avoid hammering 
> Zookeeper too much with frequently updating Zk node with large amount of 
> data. The number of already-listed entries can be huge depending on 
> use-cases. Also, we can compress entities with DistributedMapCacheClient as 
> it supports putting byte array, while ManagedState only supports Map String>.
>  * On each onTrigger:
>  ** Processor performs listing. Listed entries meeting any of the following 
> condition will be written to the 'success' output FlowFile:
>  *** Not exists in the already-listed entities
>  *** Having newer last-modified-timestamp
>  *** Having different size
>  ** Already listed entries those are old enough compared to 'Watch Time 
> Window' are discarded from the already-listed entries.
>  * Initial supporting target is Local file system, FTP and SFTP



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546087#comment-16546087
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202906360
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546074#comment-16546074
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202902757
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546070#comment-16546070
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202902116
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntity.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+public class ListedEntity {
+/**
+ * Milliseconds.
+ */
+private long timestamp;
+/**
+ * Bytes.
+ */
+private long size;
+
+public void setTimestamp(long timestamp) {
--- End diff --

You are right. I was using final fields and constructor, but when it's 
deserialized, I got an Exception around Jackson, so I added setters. I've added 
Jackson annotations to use custom constructor for deserialization. Now it's 
clear those are immutable.


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in the last 30 mins.
>  ** 'Minimum File Age' to defer listing entities potentially being written
>  * Any entity added but not listed ever having last-modified-timestamp older 
> than configured 'Watch Time Window' will not be listed. If user needs to pick 
> these items, they have to make 'Watch Time Window' longer. It also increases 
> the size of data the processor has to persist in the K/V store. Efficiency vs 
> reliability trade-off.
>  * The already-listed entities are persisted into one of supported K/V store 
> through DistributedMapCacheClient service. User can chose what KVS to use 
> from HBase, Redis, Couchbase and File (DistributedMapCacheServer with 
> persistence file).
>  * The reason to use KVS instead of ManagedState is, to avoid hammering 
> Zookeeper too much with frequently updating Zk node with large amount of 
> data. The number of already-listed entries can be huge depending on 
> use-cases. Also, we can compress entities with DistributedMapCacheClient as 
> it supports putting byte array, while ManagedState only supports Map String>.
>  * On each onTrigger:
>  ** Processor performs listing. Listed entries meeting any of the following 
> condition will be written to the 'success' output FlowFile:
>  *** Not exists in the already-listed entities
>  *** Having newer last-modified-timestamp
>  *** Having different size
>  ** Already listed entries those are old enough 

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546068#comment-16546068
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202901813
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
--- End diff --

Updated to use ConcurrentHashMap. Thanks.


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the 

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546067#comment-16546067
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202901766
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
--- End diff --

Done.


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in the last 30 mins.
>  ** 'Minimum File Age' to defer 

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546066#comment-16546066
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202901745
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546064#comment-16546064
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202901658
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546065#comment-16546065
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202901676
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546063#comment-16546063
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202901585
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546062#comment-16546062
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202901570
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546061#comment-16546061
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202901279
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
 ---
@@ -169,6 +179,28 @@
 .description("All FlowFiles that are received are routed to 
success")
 .build();
 
+public static final AllowableValue BY_TIMESTAMPS = new 
AllowableValue("timestamps", "Tracking Timestamps",
+"This strategy tracks the latest timestamp of listed entity to 
determine new/updated entities." +
+" Since it only tracks few timestamps, it can manage 
listing state efficiently." +
+" However, any newly added, or updated entity having 
timestamp older than the tracked latest timestamp can not be picked by this 
strategy." +
+" For example, such situation can happen in a file 
system if a file with old timestamp" +
+" is copied or moved into the target directory without 
its last modified timestamp being updated.");
+
+public static final AllowableValue BY_ENTITIES = new 
AllowableValue("entities", "Tracking Entities",
+"This strategy tracks information of all the listed entities 
within the latest 'Entity Tracking Time Window' to determine new/updated 
entities." +
+" See 'Entity Tracking Time Window' description for 
detail on how it works." +
--- End diff --

I wanted to guide user to look at 'Entity Tracking Time Window' property's 
description. Updated the text.


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in the last 30 mins.
>  ** 'Minimum File Age' to defer listing entities potentially being written
>  * Any entity added but not listed ever having last-modified-timestamp older 
> than configured 'Watch Time Window' will not be listed. If user needs to pick 
> these items, they have to make 'Watch Time Window' longer. It also increases 
> the size of data the processor has to persist in the K/V store. Efficiency vs 
> reliability trade-off.
>  * The already-listed entities are persisted into one of supported K/V store 
> through DistributedMapCacheClient service. User can chose what KVS to use 
> from HBase, Redis, Couchbase and File (DistributedMapCacheServer with 
> persistence file).
>  * The reason to use KVS instead of ManagedState is, to avoid hammering 
> Zookeeper too much with frequently updating Zk node with large amount of 
> data. The number of already-listed entries can be huge depending on 
> use-cases. Also, we can compress entities with DistributedMapCacheClient as 
> it supports putting byte array, while ManagedState only supports Map String>.
>  * On each onTrigger:
>  ** Processor performs listing. Listed entries meeting any of the following 
> condition will be written to the 'success' output FlowFile:
>  *** Not exists in the already-listed entities
>  *** Having newer last-modified-timestamp
>  *** Having different size
>  ** Already listed entries those are old enough compared to 'Watch Time 
> Window' are discarded from the already-listed 

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545456#comment-16545456
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202748684
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545373#comment-16545373
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202716202
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545378#comment-16545378
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202720481
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
--- End diff --

Should probably mark member variable as `final` if not changing.


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in 

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545374#comment-16545374
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202719150
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545375#comment-16545375
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202719634
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545379#comment-16545379
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202722240
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntity.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+public class ListedEntity {
+/**
+ * Milliseconds.
+ */
+private long timestamp;
+/**
+ * Bytes.
+ */
+private long size;
+
+public void setTimestamp(long timestamp) {
--- End diff --

I don't think these are ever called other than immediately after creating 
the object, to set the initial value. If they are, then this is not thread-safe 
but is accessed by multiple threads. If they are not, then it probably makes 
sense to just provide the size and timestamp as constructor arguments, no? At a 
minimum, it makes it clear when looking at the code that it's immutable and 
therefore threadsafe.


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in the last 30 mins.
>  ** 'Minimum File Age' to defer listing entities potentially being written
>  * Any entity added but not listed ever having last-modified-timestamp older 
> than configured 'Watch Time Window' will not be listed. If user needs to pick 
> these items, they have to make 'Watch Time Window' longer. It also increases 
> the size of data the processor has to persist in the K/V store. Efficiency vs 
> reliability trade-off.
>  * The already-listed entities are persisted into one of supported K/V store 
> through DistributedMapCacheClient service. User can chose what KVS to use 
> from HBase, Redis, Couchbase and File (DistributedMapCacheServer with 
> persistence file).
>  * The reason to use KVS instead of ManagedState is, to avoid hammering 
> Zookeeper too much with frequently updating Zk node with large amount of 
> data. The number of already-listed entries can be huge depending on 
> use-cases. Also, we can compress entities with DistributedMapCacheClient as 
> it supports putting byte array, while ManagedState only supports Map String>.
>  * On each onTrigger:
>  ** Processor performs listing. Listed entries meeting any of the following 
> condition will be written to the 'success' output FlowFile:
>  *** Not exists in the 

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545377#comment-16545377
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202721147
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
--- End diff --

This is a member variable that is mutable. Access to it is not protected 
within the code, which means that it is not thread-safe. Probably best to use a 
ConcurrentHashMap.


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545380#comment-16545380
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202705819
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
 ---
@@ -169,6 +179,28 @@
 .description("All FlowFiles that are received are routed to 
success")
 .build();
 
+public static final AllowableValue BY_TIMESTAMPS = new 
AllowableValue("timestamps", "Tracking Timestamps",
+"This strategy tracks the latest timestamp of listed entity to 
determine new/updated entities." +
+" Since it only tracks few timestamps, it can manage 
listing state efficiently." +
+" However, any newly added, or updated entity having 
timestamp older than the tracked latest timestamp can not be picked by this 
strategy." +
+" For example, such situation can happen in a file 
system if a file with old timestamp" +
+" is copied or moved into the target directory without 
its last modified timestamp being updated.");
+
+public static final AllowableValue BY_ENTITIES = new 
AllowableValue("entities", "Tracking Entities",
+"This strategy tracks information of all the listed entities 
within the latest 'Entity Tracking Time Window' to determine new/updated 
entities." +
+" See 'Entity Tracking Time Window' description for 
detail on how it works." +
--- End diff --

See 'Entity Tracking Time Window' description where?


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in the last 30 mins.
>  ** 'Minimum File Age' to defer listing entities potentially being written
>  * Any entity added but not listed ever having last-modified-timestamp older 
> than configured 'Watch Time Window' will not be listed. If user needs to pick 
> these items, they have to make 'Watch Time Window' longer. It also increases 
> the size of data the processor has to persist in the K/V store. Efficiency vs 
> reliability trade-off.
>  * The already-listed entities are persisted into one of supported K/V store 
> through DistributedMapCacheClient service. User can chose what KVS to use 
> from HBase, Redis, Couchbase and File (DistributedMapCacheServer with 
> persistence file).
>  * The reason to use KVS instead of ManagedState is, to avoid hammering 
> Zookeeper too much with frequently updating Zk node with large amount of 
> data. The number of already-listed entries can be huge depending on 
> use-cases. Also, we can compress entities with DistributedMapCacheClient as 
> it supports putting byte array, while ManagedState only supports Map String>.
>  * On each onTrigger:
>  ** Processor performs listing. Listed entries meeting any of the following 
> condition will be written to the 'success' output FlowFile:
>  *** Not exists in the already-listed entities
>  *** Having newer last-modified-timestamp
>  *** Having different size
>  ** Already listed entries those are old enough compared to 'Watch Time 
> Window' are discarded from the already-listed entries.
>  * Initial supporting target is Local file system, 

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545376#comment-16545376
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202715409
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545371#comment-16545371
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202714356
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545372#comment-16545372
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2876#discussion_r202714807
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import static java.lang.String.format;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+
+public class ListedEntityTracker {
+
+private ObjectMapper objectMapper = new ObjectMapper();
+private volatile Map alreadyListedEntities;
+
+private static final String NOTE = "Used by 'Tracking Entities' 
strategy.";
+public static final PropertyDescriptor TRACKING_STATE_CACHE = new 
PropertyDescriptor.Builder()
+.name("et-state-cache")
+.displayName("Entity Tracking State Cache")
+.description(format("Listed entities are stored in the 
specified cache storage" +
+" so that this processor can resume listing across 
NiFi restart or in case of primary node change." +
+" 'Tracking Entities' strategy require tracking 
information of all listed entities within the last 'Tracking Time Window'." +
+" To support large number of entities, the strategy 
uses DistributedMapCache instead of managed state." +
+" Cache key format is 
'ListedEntityTracker::{processorId}(::{nodeId})'." +
+" If it tracks per node listed entities, then the 
optional '::{nodeId}' part is added to manage state separately." +
+" E.g. cluster wide cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," +
+" per node cache key = 
'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" +
+" The stored cache content is Gzipped JSON string." +
+" The cache key will be deleted when target listing 
configuration is changed." +
+" %s", NOTE))
+.identifiesControllerService(DistributedMapCacheClient.class)

[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544648#comment-16544648
 ] 

ASF GitHub Bot commented on NIFI-5406:
--

Github user ijokarumawak commented on the issue:

https://github.com/apache/nifi/pull/2876
  
Docs and unit tests are added. All todo items are done. This PR is now 
fully ready for review. Thanks!


> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in the last 30 mins.
>  ** 'Minimum File Age' to defer listing entities potentially being written
>  * Any entity added but not listed ever having last-modified-timestamp older 
> than configured 'Watch Time Window' will not be listed. If user needs to pick 
> these items, they have to make 'Watch Time Window' longer. It also increases 
> the size of data the processor has to persist in the K/V store. Efficiency vs 
> reliability trade-off.
>  * The already-listed entities are persisted into one of supported K/V store 
> through DistributedMapCacheClient service. User can chose what KVS to use 
> from HBase, Redis, Couchbase and File (DistributedMapCacheServer with 
> persistence file).
>  * The reason to use KVS instead of ManagedState is, to avoid hammering 
> Zookeeper too much with frequently updating Zk node with large amount of 
> data. The number of already-listed entries can be huge depending on 
> use-cases. Also, we can compress entities with DistributedMapCacheClient as 
> it supports putting byte array, while ManagedState only supports Map String>.
>  * On each onTrigger:
>  ** Processor performs listing. Listed entries meeting any of the following 
> condition will be written to the 'success' output FlowFile:
>  *** Not exists in the already-listed entities
>  *** Having newer last-modified-timestamp
>  *** Having different size
>  ** Already listed entries those are old enough compared to 'Watch Time 
> Window' are discarded from the already-listed entries.
>  * Initial supporting target is Local file system, FTP and SFTP



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors

2018-07-12 Thread Koji Kawamura (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541395#comment-16541395
 ] 

Koji Kawamura commented on NIFI-5406:
-

[~joewitt] Thanks for your comments. I've updated the PR to let user to choose 
listing strategy. It behaves as is by default setting.

> Add new listing strategy by tracking listed entities to List processors
> ---
>
> Key: NIFI-5406
> URL: https://issues.apache.org/jira/browse/NIFI-5406
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Koji Kawamura
>Assignee: Koji Kawamura
>Priority: Major
>
> Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation 
> relies on file last modified timestamp to pick new or updated files. This 
> approach is efficient and lightweight in terms of state management, because 
> it only tracks latest modified timestamp and last executed timestamp. 
> However, timestamps do not work as expected in some file systems, causing 
> List processors missing files periodically. See NIFI-3332 comments for 
> details.
> In order to pick every entity that has not seen before or has been updated 
> since it had seen last time, we need another set of processors using 
> different approach, that is by tracking listed entities:
>  * Add new abstract processor AbstractWatchEntries similar to 
> AbstractListProcessor but uses different approach
>  * Target entities have: name (path), size and last-modified-timestamp
>  * Implementation Processors have following properties:
>  ** 'Watch Time Window' to limit the maximum time period to hold the already 
> listed entries. E.g. if set as '30min', the processor keeps entities listed 
> in the last 30 mins.
>  ** 'Minimum File Age' to defer listing entities potentially being written
>  * Any entity added but not listed ever having last-modified-timestamp older 
> than configured 'Watch Time Window' will not be listed. If user needs to pick 
> these items, they have to make 'Watch Time Window' longer. It also increases 
> the size of data the processor has to persist in the K/V store. Efficiency vs 
> reliability trade-off.
>  * The already-listed entities are persisted into one of supported K/V store 
> through DistributedMapCacheClient service. User can chose what KVS to use 
> from HBase, Redis, Couchbase and File (DistributedMapCacheServer with 
> persistence file).
>  * The reason to use KVS instead of ManagedState is, to avoid hammering 
> Zookeeper too much with frequently updating Zk node with large amount of 
> data. The number of already-listed entries can be huge depending on 
> use-cases. Also, we can compress entities with DistributedMapCacheClient as 
> it supports putting byte array, while ManagedState only supports Map String>.
>  * On each onTrigger:
>  ** Processor performs listing. Listed entries meeting any of the following 
> condition will be written to the 'success' output FlowFile:
>  *** Not exists in the already-listed entities
>  *** Having newer last-modified-timestamp
>  *** Having different size
>  ** Already listed entries those are old enough compared to 'Watch Time 
> Window' are discarded from the already-listed entries.
>  * Initial supporting target is Local file system, FTP and SFTP



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)