[ https://issues.apache.org/jira/browse/NIFI-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aldrin Piri resolved NIFI-3545. ------------------------------- Resolution: Done Fix Version/s: 1.2.0 > Let M FlowFilews pass through once N signals arrive > --------------------------------------------------- > > Key: NIFI-3545 > URL: https://issues.apache.org/jira/browse/NIFI-3545 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions > Reporter: Koji Kawamura > Assignee: Koji Kawamura > Fix For: 1.2.0 > > > If Wait processor can: > "Let M flow files pass through once N notify signals arrived for key K" > we can support more variety type of use-cases. Currently, it only support > "Let 1 flow file pass through once N notify signals arrived for key K" > h3. How it works? Simulation > For example, let's say there are 50 incoming flow files at the beginning, f1 > to f50. > N=3, M=100 > It can be read as "Wait processor is allowed to convert 3 signals to get 100 > pass tickets." > 1. There's no signal for K, all flow files are waiting > 2. Notify sends a signal. K( N=1 ) doesn't meet Wait condition, Wait > processor is still waiting > 3. Notify sends another two signals. Now K( N=3 ) matches Wait condition > 4. Wait processor starts consuming flow files, f1 to f50, update K( N=3, > M=50), where M denotes remaining number of flow files those can go through > 5. Another 30 flow files arrive, Wait processor consumes f51 to f80, update > K( N=0, M=20) > 6. Another 30 flow files arrive, Wait processor consumes f81 to f100. K is > now K( N=0, M=0 ). Since all N and M is used, Wait processor removes K. f101 > to f110 are waiting for signals, same state as #1. > h4. Alternative path after 6 > 7a. If Notify sends additional signals, then f101 to f110 can go through > 7b. If Notify doesn't send any more signal, then f101 to f110 will be routed > to expired > h4. Alternative path after 5 > 6a. If Notify sends additional signal at this point, K would be K( N=1, > M=20). Wait processor can process 20 flow files because it still has M=20. > 6b. If Notify sends additional three signals, K would be K(N=3, M=20). Wait > processor consumes 20 flow files, and when 21th flow file comes, it > immediately convert N to M, meaning consume N(3) to create M(100) pass, then > K(N=0, M=100) > Additionally, we can let user configure M=0. Meaning, Wait can release any > number of incoming flow files as long as N meets the condition. > With this, Notify +1 can behave as if it opens a GATE, and Notify –1 will > close it. > h4. Another possible use-case, 'Limit data flow rate at cluster wide' > It's more complex than just supporting GATE open/close state. However, if we > support M flow files to go through, it can also provide rate limit across > cluster. > Example use case, NiFi A push data via S2S to NiFi B, and want to limit 100 > flow files per 5 min. > On NiFi A: > Notify part of flow: GenerateFlowFile(5 min, on primary) -> Notify(K, N=+1) > Wait part of flow: Some ingested data -> Wait(K, N=1, M=100) > Since Wait/Notify state is managed globally via DistributedCache, we can > limit throughput cluster wide. > If use case requires to limit rate exactly, then they can design Notify part > as: > GenerateFlowFile(5 min, on primary) -> Notify(K, N=0) -> Notify(K, N=+1) > It avoids N to be added up when there's no traffic. -- This message was sent by Atlassian JIRA (v6.3.15#6346)