Hi Lee, Thanks for the link to that thread, it was one of the ones I had gone through before posting here. I don't think it works for me in my use case, though, since I have a JSON string coming in to both the Process Group and the Merge Content, I modify the flow file in the process group to have attributes, and went it exits and gets into the Merge Content, the JSON flow file content is now duplicated. Oddly, it also seemed like the back pressure didn't work right since inside my process group, while I was testing things, I had a lot of flow files coming in.
This thread, http://apache-nifi.1125220.n5.nabble.com/Wait-only-if-flagged-td19925.html though, talks about the same situation, which is where Koji mentions his Wait/Notify example at https://gist.github.com/ijokarumawak/9e1a4855934f2bb9661f88ca625bd244, but as he states on that page, "The tricky part is how to setup the initial state in a DistributedMapCache in order to pass the first incoming FlowFile." I also wondered if a Property could be added to the ConnectionDTO to allow the backpressure queue to be applied to the entire object that the connection is pointing to, in this case as Process Group. Since the Connection knows the 'Within Group' of both sides of the connection, it could identify the Process Group, and identify if there are any flowfiles in there. Though, this could be problematic for a Process Group with multiple Input Ports, or the code to identify the paths from the Input Port to any Output Ports to determine the back pressure information. I'm not sure if this type of suggestion conforms to the NiFi paradigm of flow processing and doing 1 thing well. I'm going to try a DistributedCacheService, along the lines of Wait/Notify that will be able to use the not-found of the FetchCache, and then use RouteOnAttribute based on that value to determine if we can proceed or go to a Wait processor, and then use PutCache at the end of the Process Group (or groups of Processors, even) and possibly a Notify, and see how that might get me forward. Thanks, John -------------------------------------------- On Thu, 10/11/18, Lee Laim (leelaim) <[email protected]> wrote: Subject: RE: [EXT] Back Pressure on Process Group To: "[email protected]" <[email protected]>, "John McGinn" <[email protected]> Date: Thursday, October 11, 2018, 3:49 PM Hi John, You can send the initiating flowfile into the Process Group and simultaneously send a duplicate flowfile around the process group into a MergeContent processor. This duplicate will act as a back pressure "latch". Upon successful exit of the process group, Merge Content with a strict correlation strategy will clear the back pressure latch, allowing the next flowfile into the group. Personally, I think Wait/Notify is the more elegant solution, but have successfully used the back pressure latch before Wait/Notify was readily available. This thread might offer some additional insight: http://apache-nifi.1125220.n5.nabble.com/Having-a-processor-wait-for-all-inputs-td15614.html Thanks, Lee -----Original Message----- From: John McGinn [mailto:[email protected]] Sent: Thursday, October 11, 2018 12:17 PM To: [email protected] Subject: [EXT] Back Pressure on Process Group I've been going through mailing list archives, and looking at blog posts around Wait/Notify, and these don't seem to be the solution for my use case. My basic use case is as follows. I have 4 DB tables, 3 of which are id/name pairs (office name, city, state), and the 4th table joins the 3 ids together to a new id which is used elsewhere in the database. Using NiFi to injest data from a different database system, we have to verify if that office is active, and if it isn't active or non-existent, create a new record, as well as any of the other 3 tables necessary. The first step, then, is to join the 4 tables together to search for the name fields, and if the join comes back with a row, use that top level id as an attribute. No problem, works fine. (FetchDatabaseTable -> AvroToJson -> EvaluateJsonPath, etc.) If the join comes back empty, I need to insert rows for the 3 pieces and then join them together. Ideally, this would be a flow of 3 PutSqls, then a connection back to the top level search of the database. (Currently I'm using a modified custom processor, LookupAttributeFromSQL, that Brett Ryan did in January 18th, before he worked on a SQLLookupService.) The problem is that I could have 2 records coming in with the same pieces of information, and because it's flow based, the check for the 4 table join will come up empty on the second record before the first record is done creating the 4 table records. I've investigated the Wait/Notify pattern, but the odd part for me is that you need to have a separate "initialization" of the Wait/Notify release signal indicator (https://gist.github.com/ijokarumawak/9e1a4855934f2bb9661f88ca625bd244) and that seems "hack-ish" to me. With all of that said, I was curious if there was a way to have a back pressure value of 1 into the Process Group, so that if there is a flow file anywhere in the Process Group, the flow file is unable to enter that Process Group? That way, the creation of the 4 records could be inside a process group, and no other flowfile can enter until that first flowfile has exited. Thanks for any insight, John
