[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16702923#comment-16702923 ] ASF subversion and git services commented on NIFI-5406: --- Commit 30f2f4205121113c26bb00ac5a8697dffaeb8206 in nifi's branch refs/heads/master from [~ijokarumawak] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=30f2f42 ] NIFI-5849: ListXXX can lose cluster state on processor restart NIFI-5406 introduced the issue by trying to use the resetState variable for different purposes. AbstractListProcessor should have had a different variable to control whether to clear state for tracking entity strategy. Signed-off-by: Pierre Villard This closes #3189. > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > Fix For: 1.8.0 > > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the processor keeps entities listed > in the last 30 mins. > ** 'Minimum File Age' to defer listing entities potentially being written > * Any entity added but not listed ever having last-modified-timestamp older > than configured 'Watch Time Window' will not be listed. If user needs to pick > these items, they have to make 'Watch Time Window' longer. It also increases > the size of data the processor has to persist in the K/V store. Efficiency vs > reliability trade-off. > * The already-listed entities are persisted into one of supported K/V store > through DistributedMapCacheClient service. User can chose what KVS to use > from HBase, Redis, Couchbase and File (DistributedMapCacheServer with > persistence file). > * The reason to use KVS instead of ManagedState is, to avoid hammering > Zookeeper too much with frequently updating Zk node with large amount of > data. The number of already-listed entries can be huge depending on > use-cases. Also, we can compress entities with DistributedMapCacheClient as > it supports putting byte array, while ManagedState only supports Map String>. > * On each onTrigger: > ** Processor performs listing. Listed entries meeting any of the following > condition will be written to the 'success' output FlowFile: > *** Not exists in the already-listed entities > *** Having newer last-modified-timestamp > *** Having different size > ** Already listed entries those are old enough compared to 'Watch Time > Window' are discarded from the already-listed entries. > * Initial supporting target is Local file system, FTP and SFTP -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546954#comment-16546954 ] ASF GitHub Bot commented on NIFI-5406: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2876 > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > Fix For: 1.8.0 > > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the processor keeps entities listed > in the last 30 mins. > ** 'Minimum File Age' to defer listing entities potentially being written > * Any entity added but not listed ever having last-modified-timestamp older > than configured 'Watch Time Window' will not be listed. If user needs to pick > these items, they have to make 'Watch Time Window' longer. It also increases > the size of data the processor has to persist in the K/V store. Efficiency vs > reliability trade-off. > * The already-listed entities are persisted into one of supported K/V store > through DistributedMapCacheClient service. User can chose what KVS to use > from HBase, Redis, Couchbase and File (DistributedMapCacheServer with > persistence file). > * The reason to use KVS instead of ManagedState is, to avoid hammering > Zookeeper too much with frequently updating Zk node with large amount of > data. The number of already-listed entries can be huge depending on > use-cases. Also, we can compress entities with DistributedMapCacheClient as > it supports putting byte array, while ManagedState only supports Map String>. > * On each onTrigger: > ** Processor performs listing. Listed entries meeting any of the following > condition will be written to the 'success' output FlowFile: > *** Not exists in the already-listed entities > *** Having newer last-modified-timestamp > *** Having different size > ** Already listed entries those are old enough compared to 'Watch Time > Window' are discarded from the already-listed entries. > * Initial supporting target is Local file system, FTP and SFTP -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546953#comment-16546953 ] ASF GitHub Bot commented on NIFI-5406: -- Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/2876 All of my testing looks good as well. +1 merged to master. Thanks for the improvement, @ijokarumawak ! > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the processor keeps entities listed > in the last 30 mins. > ** 'Minimum File Age' to defer listing entities potentially being written > * Any entity added but not listed ever having last-modified-timestamp older > than configured 'Watch Time Window' will not be listed. If user needs to pick > these items, they have to make 'Watch Time Window' longer. It also increases > the size of data the processor has to persist in the K/V store. Efficiency vs > reliability trade-off. > * The already-listed entities are persisted into one of supported K/V store > through DistributedMapCacheClient service. User can chose what KVS to use > from HBase, Redis, Couchbase and File (DistributedMapCacheServer with > persistence file). > * The reason to use KVS instead of ManagedState is, to avoid hammering > Zookeeper too much with frequently updating Zk node with large amount of > data. The number of already-listed entries can be huge depending on > use-cases. Also, we can compress entities with DistributedMapCacheClient as > it supports putting byte array, while ManagedState only supports Map String>. > * On each onTrigger: > ** Processor performs listing. Listed entries meeting any of the following > condition will be written to the 'success' output FlowFile: > *** Not exists in the already-listed entities > *** Having newer last-modified-timestamp > *** Having different size > ** Already listed entries those are old enough compared to 'Watch Time > Window' are discarded from the already-listed entries. > * Initial supporting target is Local file system, FTP and SFTP -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546911#comment-16546911 ] ASF GitHub Bot commented on NIFI-5406: -- Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/2876 @ijokarumawak thanks for the updates! Code looks good! I will run a few tests and if all looks good will merge to master. Thanks! > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the processor keeps entities listed > in the last 30 mins. > ** 'Minimum File Age' to defer listing entities potentially being written > * Any entity added but not listed ever having last-modified-timestamp older > than configured 'Watch Time Window' will not be listed. If user needs to pick > these items, they have to make 'Watch Time Window' longer. It also increases > the size of data the processor has to persist in the K/V store. Efficiency vs > reliability trade-off. > * The already-listed entities are persisted into one of supported K/V store > through DistributedMapCacheClient service. User can chose what KVS to use > from HBase, Redis, Couchbase and File (DistributedMapCacheServer with > persistence file). > * The reason to use KVS instead of ManagedState is, to avoid hammering > Zookeeper too much with frequently updating Zk node with large amount of > data. The number of already-listed entries can be huge depending on > use-cases. Also, we can compress entities with DistributedMapCacheClient as > it supports putting byte array, while ManagedState only supports Map String>. > * On each onTrigger: > ** Processor performs listing. Listed entries meeting any of the following > condition will be written to the 'success' output FlowFile: > *** Not exists in the already-listed entities > *** Having newer last-modified-timestamp > *** Having different size > ** Already listed entries those are old enough compared to 'Watch Time > Window' are discarded from the already-listed entries. > * Initial supporting target is Local file system, FTP and SFTP -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546089#comment-16546089 ] ASF GitHub Bot commented on NIFI-5406: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2876 @markap14 I believe all review comments are addressed now. Also, fixed few things found during tests under clustered environment. And changed default initial listing target from 'window' to 'all' to make it behave the same as existing listing does. I hope it's ready to be merged. Thanks for reviewing and insightful feedback! > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the processor keeps entities listed > in the last 30 mins. > ** 'Minimum File Age' to defer listing entities potentially being written > * Any entity added but not listed ever having last-modified-timestamp older > than configured 'Watch Time Window' will not be listed. If user needs to pick > these items, they have to make 'Watch Time Window' longer. It also increases > the size of data the processor has to persist in the K/V store. Efficiency vs > reliability trade-off. > * The already-listed entities are persisted into one of supported K/V store > through DistributedMapCacheClient service. User can chose what KVS to use > from HBase, Redis, Couchbase and File (DistributedMapCacheServer with > persistence file). > * The reason to use KVS instead of ManagedState is, to avoid hammering > Zookeeper too much with frequently updating Zk node with large amount of > data. The number of already-listed entries can be huge depending on > use-cases. Also, we can compress entities with DistributedMapCacheClient as > it supports putting byte array, while ManagedState only supports Map String>. > * On each onTrigger: > ** Processor performs listing. Listed entries meeting any of the following > condition will be written to the 'success' output FlowFile: > *** Not exists in the already-listed entities > *** Having newer last-modified-timestamp > *** Having different size > ** Already listed entries those are old enough compared to 'Watch Time > Window' are discarded from the already-listed entries. > * Initial supporting target is Local file system, FTP and SFTP -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546087#comment-16546087 ] ASF GitHub Bot commented on NIFI-5406: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202906360 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546074#comment-16546074 ] ASF GitHub Bot commented on NIFI-5406: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202902757 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546070#comment-16546070 ] ASF GitHub Bot commented on NIFI-5406: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202902116 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntity.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +public class ListedEntity { +/** + * Milliseconds. + */ +private long timestamp; +/** + * Bytes. + */ +private long size; + +public void setTimestamp(long timestamp) { --- End diff -- You are right. I was using final fields and constructor, but when it's deserialized, I got an Exception around Jackson, so I added setters. I've added Jackson annotations to use custom constructor for deserialization. Now it's clear those are immutable. > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the processor keeps entities listed > in the last 30 mins. > ** 'Minimum File Age' to defer listing entities potentially being written > * Any entity added but not listed ever having last-modified-timestamp older > than configured 'Watch Time Window' will not be listed. If user needs to pick > these items, they have to make 'Watch Time Window' longer. It also increases > the size of data the processor has to persist in the K/V store. Efficiency vs > reliability trade-off. > * The already-listed entities are persisted into one of supported K/V store > through DistributedMapCacheClient service. User can chose what KVS to use > from HBase, Redis, Couchbase and File (DistributedMapCacheServer with > persistence file). > * The reason to use KVS instead of ManagedState is, to avoid hammering > Zookeeper too much with frequently updating Zk node with large amount of > data. The number of already-listed entries can be huge depending on > use-cases. Also, we can compress entities with DistributedMapCacheClient as > it supports putting byte array, while ManagedState only supports Map String>. > * On each onTrigger: > ** Processor performs listing. Listed entries meeting any of the following > condition will be written to the 'success' output FlowFile: > *** Not exists in the already-listed entities > *** Having newer last-modified-timestamp > *** Having different size > ** Already listed entries those are old enough
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546068#comment-16546068 ] ASF GitHub Bot commented on NIFI-5406: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202901813 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; --- End diff -- Updated to use ConcurrentHashMap. Thanks. > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546067#comment-16546067 ] ASF GitHub Bot commented on NIFI-5406: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202901766 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); --- End diff -- Done. > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the processor keeps entities listed > in the last 30 mins. > ** 'Minimum File Age' to defer
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546066#comment-16546066 ] ASF GitHub Bot commented on NIFI-5406: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202901745 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546064#comment-16546064 ] ASF GitHub Bot commented on NIFI-5406: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202901658 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546065#comment-16546065 ] ASF GitHub Bot commented on NIFI-5406: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202901676 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546063#comment-16546063 ] ASF GitHub Bot commented on NIFI-5406: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202901585 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546062#comment-16546062 ] ASF GitHub Bot commented on NIFI-5406: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202901570 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546061#comment-16546061 ] ASF GitHub Bot commented on NIFI-5406: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202901279 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java --- @@ -169,6 +179,28 @@ .description("All FlowFiles that are received are routed to success") .build(); +public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps", +"This strategy tracks the latest timestamp of listed entity to determine new/updated entities." + +" Since it only tracks few timestamps, it can manage listing state efficiently." + +" However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy." + +" For example, such situation can happen in a file system if a file with old timestamp" + +" is copied or moved into the target directory without its last modified timestamp being updated."); + +public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities", +"This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." + +" See 'Entity Tracking Time Window' description for detail on how it works." + --- End diff -- I wanted to guide user to look at 'Entity Tracking Time Window' property's description. Updated the text. > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the processor keeps entities listed > in the last 30 mins. > ** 'Minimum File Age' to defer listing entities potentially being written > * Any entity added but not listed ever having last-modified-timestamp older > than configured 'Watch Time Window' will not be listed. If user needs to pick > these items, they have to make 'Watch Time Window' longer. It also increases > the size of data the processor has to persist in the K/V store. Efficiency vs > reliability trade-off. > * The already-listed entities are persisted into one of supported K/V store > through DistributedMapCacheClient service. User can chose what KVS to use > from HBase, Redis, Couchbase and File (DistributedMapCacheServer with > persistence file). > * The reason to use KVS instead of ManagedState is, to avoid hammering > Zookeeper too much with frequently updating Zk node with large amount of > data. The number of already-listed entries can be huge depending on > use-cases. Also, we can compress entities with DistributedMapCacheClient as > it supports putting byte array, while ManagedState only supports Map String>. > * On each onTrigger: > ** Processor performs listing. Listed entries meeting any of the following > condition will be written to the 'success' output FlowFile: > *** Not exists in the already-listed entities > *** Having newer last-modified-timestamp > *** Having different size > ** Already listed entries those are old enough compared to 'Watch Time > Window' are discarded from the already-listed
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545456#comment-16545456 ] ASF GitHub Bot commented on NIFI-5406: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202748684 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545373#comment-16545373 ] ASF GitHub Bot commented on NIFI-5406: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202716202 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545378#comment-16545378 ] ASF GitHub Bot commented on NIFI-5406: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202720481 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); --- End diff -- Should probably mark member variable as `final` if not changing. > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the processor keeps entities listed > in
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545374#comment-16545374 ] ASF GitHub Bot commented on NIFI-5406: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202719150 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545375#comment-16545375 ] ASF GitHub Bot commented on NIFI-5406: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202719634 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545379#comment-16545379 ] ASF GitHub Bot commented on NIFI-5406: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202722240 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntity.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +public class ListedEntity { +/** + * Milliseconds. + */ +private long timestamp; +/** + * Bytes. + */ +private long size; + +public void setTimestamp(long timestamp) { --- End diff -- I don't think these are ever called other than immediately after creating the object, to set the initial value. If they are, then this is not thread-safe but is accessed by multiple threads. If they are not, then it probably makes sense to just provide the size and timestamp as constructor arguments, no? At a minimum, it makes it clear when looking at the code that it's immutable and therefore threadsafe. > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the processor keeps entities listed > in the last 30 mins. > ** 'Minimum File Age' to defer listing entities potentially being written > * Any entity added but not listed ever having last-modified-timestamp older > than configured 'Watch Time Window' will not be listed. If user needs to pick > these items, they have to make 'Watch Time Window' longer. It also increases > the size of data the processor has to persist in the K/V store. Efficiency vs > reliability trade-off. > * The already-listed entities are persisted into one of supported K/V store > through DistributedMapCacheClient service. User can chose what KVS to use > from HBase, Redis, Couchbase and File (DistributedMapCacheServer with > persistence file). > * The reason to use KVS instead of ManagedState is, to avoid hammering > Zookeeper too much with frequently updating Zk node with large amount of > data. The number of already-listed entries can be huge depending on > use-cases. Also, we can compress entities with DistributedMapCacheClient as > it supports putting byte array, while ManagedState only supports Map String>. > * On each onTrigger: > ** Processor performs listing. Listed entries meeting any of the following > condition will be written to the 'success' output FlowFile: > *** Not exists in the
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545377#comment-16545377 ] ASF GitHub Bot commented on NIFI-5406: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202721147 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; --- End diff -- This is a member variable that is mutable. Access to it is not protected within the code, which means that it is not thread-safe. Probably best to use a ConcurrentHashMap. > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: >
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545380#comment-16545380 ] ASF GitHub Bot commented on NIFI-5406: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202705819 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java --- @@ -169,6 +179,28 @@ .description("All FlowFiles that are received are routed to success") .build(); +public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps", +"This strategy tracks the latest timestamp of listed entity to determine new/updated entities." + +" Since it only tracks few timestamps, it can manage listing state efficiently." + +" However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy." + +" For example, such situation can happen in a file system if a file with old timestamp" + +" is copied or moved into the target directory without its last modified timestamp being updated."); + +public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities", +"This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." + +" See 'Entity Tracking Time Window' description for detail on how it works." + --- End diff -- See 'Entity Tracking Time Window' description where? > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the processor keeps entities listed > in the last 30 mins. > ** 'Minimum File Age' to defer listing entities potentially being written > * Any entity added but not listed ever having last-modified-timestamp older > than configured 'Watch Time Window' will not be listed. If user needs to pick > these items, they have to make 'Watch Time Window' longer. It also increases > the size of data the processor has to persist in the K/V store. Efficiency vs > reliability trade-off. > * The already-listed entities are persisted into one of supported K/V store > through DistributedMapCacheClient service. User can chose what KVS to use > from HBase, Redis, Couchbase and File (DistributedMapCacheServer with > persistence file). > * The reason to use KVS instead of ManagedState is, to avoid hammering > Zookeeper too much with frequently updating Zk node with large amount of > data. The number of already-listed entries can be huge depending on > use-cases. Also, we can compress entities with DistributedMapCacheClient as > it supports putting byte array, while ManagedState only supports Map String>. > * On each onTrigger: > ** Processor performs listing. Listed entries meeting any of the following > condition will be written to the 'success' output FlowFile: > *** Not exists in the already-listed entities > *** Having newer last-modified-timestamp > *** Having different size > ** Already listed entries those are old enough compared to 'Watch Time > Window' are discarded from the already-listed entries. > * Initial supporting target is Local file system,
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545376#comment-16545376 ] ASF GitHub Bot commented on NIFI-5406: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202715409 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545371#comment-16545371 ] ASF GitHub Bot commented on NIFI-5406: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202714356 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545372#comment-16545372 ] ASF GitHub Bot commented on NIFI-5406: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2876#discussion_r202714807 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java --- @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import static java.lang.String.format; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS; + +public class ListedEntityTracker { + +private ObjectMapper objectMapper = new ObjectMapper(); +private volatile Map alreadyListedEntities; + +private static final String NOTE = "Used by 'Tracking Entities' strategy."; +public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder() +.name("et-state-cache") +.displayName("Entity Tracking State Cache") +.description(format("Listed entities are stored in the specified cache storage" + +" so that this processor can resume listing across NiFi restart or in case of primary node change." + +" 'Tracking Entities' strategy require tracking information of all listed entities within the last 'Tracking Time Window'." + +" To support large number of entities, the strategy uses DistributedMapCache instead of managed state." + +" Cache key format is 'ListedEntityTracker::{processorId}(::{nodeId})'." + +" If it tracks per node listed entities, then the optional '::{nodeId}' part is added to manage state separately." + +" E.g. cluster wide cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b'," + +" per node cache key = 'ListedEntityTracker::8dda2321-0164-1000-50fa-3042fe7d6a7b::nifi-node3'" + +" The stored cache content is Gzipped JSON string." + +" The cache key will be deleted when target listing configuration is changed." + +" %s", NOTE)) +.identifiesControllerService(DistributedMapCacheClient.class)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544648#comment-16544648 ] ASF GitHub Bot commented on NIFI-5406: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2876 Docs and unit tests are added. All todo items are done. This PR is now fully ready for review. Thanks! > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the processor keeps entities listed > in the last 30 mins. > ** 'Minimum File Age' to defer listing entities potentially being written > * Any entity added but not listed ever having last-modified-timestamp older > than configured 'Watch Time Window' will not be listed. If user needs to pick > these items, they have to make 'Watch Time Window' longer. It also increases > the size of data the processor has to persist in the K/V store. Efficiency vs > reliability trade-off. > * The already-listed entities are persisted into one of supported K/V store > through DistributedMapCacheClient service. User can chose what KVS to use > from HBase, Redis, Couchbase and File (DistributedMapCacheServer with > persistence file). > * The reason to use KVS instead of ManagedState is, to avoid hammering > Zookeeper too much with frequently updating Zk node with large amount of > data. The number of already-listed entries can be huge depending on > use-cases. Also, we can compress entities with DistributedMapCacheClient as > it supports putting byte array, while ManagedState only supports Map String>. > * On each onTrigger: > ** Processor performs listing. Listed entries meeting any of the following > condition will be written to the 'success' output FlowFile: > *** Not exists in the already-listed entities > *** Having newer last-modified-timestamp > *** Having different size > ** Already listed entries those are old enough compared to 'Watch Time > Window' are discarded from the already-listed entries. > * Initial supporting target is Local file system, FTP and SFTP -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5406) Add new listing strategy by tracking listed entities to ListXXXX processors
[ https://issues.apache.org/jira/browse/NIFI-5406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541395#comment-16541395 ] Koji Kawamura commented on NIFI-5406: - [~joewitt] Thanks for your comments. I've updated the PR to let user to choose listing strategy. It behaves as is by default setting. > Add new listing strategy by tracking listed entities to List processors > --- > > Key: NIFI-5406 > URL: https://issues.apache.org/jira/browse/NIFI-5406 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Current List processors (ListFile, ListFTP, ListSFTP ... etc) implementation > relies on file last modified timestamp to pick new or updated files. This > approach is efficient and lightweight in terms of state management, because > it only tracks latest modified timestamp and last executed timestamp. > However, timestamps do not work as expected in some file systems, causing > List processors missing files periodically. See NIFI-3332 comments for > details. > In order to pick every entity that has not seen before or has been updated > since it had seen last time, we need another set of processors using > different approach, that is by tracking listed entities: > * Add new abstract processor AbstractWatchEntries similar to > AbstractListProcessor but uses different approach > * Target entities have: name (path), size and last-modified-timestamp > * Implementation Processors have following properties: > ** 'Watch Time Window' to limit the maximum time period to hold the already > listed entries. E.g. if set as '30min', the processor keeps entities listed > in the last 30 mins. > ** 'Minimum File Age' to defer listing entities potentially being written > * Any entity added but not listed ever having last-modified-timestamp older > than configured 'Watch Time Window' will not be listed. If user needs to pick > these items, they have to make 'Watch Time Window' longer. It also increases > the size of data the processor has to persist in the K/V store. Efficiency vs > reliability trade-off. > * The already-listed entities are persisted into one of supported K/V store > through DistributedMapCacheClient service. User can chose what KVS to use > from HBase, Redis, Couchbase and File (DistributedMapCacheServer with > persistence file). > * The reason to use KVS instead of ManagedState is, to avoid hammering > Zookeeper too much with frequently updating Zk node with large amount of > data. The number of already-listed entries can be huge depending on > use-cases. Also, we can compress entities with DistributedMapCacheClient as > it supports putting byte array, while ManagedState only supports Map String>. > * On each onTrigger: > ** Processor performs listing. Listed entries meeting any of the following > condition will be written to the 'success' output FlowFile: > *** Not exists in the already-listed entities > *** Having newer last-modified-timestamp > *** Having different size > ** Already listed entries those are old enough compared to 'Watch Time > Window' are discarded from the already-listed entries. > * Initial supporting target is Local file system, FTP and SFTP -- This message was sent by Atlassian JIRA (v7.6.3#76005)