Hi Lee,

Thanks for the link to that thread, it was one of the ones I had gone through
before posting here. I don't think it works for me in my use case, though,
since I have a JSON string coming in to both the Process Group and the Merge
Content, I modify the flow file in the process group to have attributes, and
went it exits and gets into the Merge Content, the JSON flow file content
is now duplicated. Oddly, it also seemed like the back pressure didn't work
right since inside my process group, while I was testing things, I had a lot of
flow files coming in.

This thread, 
http://apache-nifi.1125220.n5.nabble.com/Wait-only-if-flagged-td19925.html
though, talks about the same situation, which is where Koji mentions his
Wait/Notify example at 
https://gist.github.com/ijokarumawak/9e1a4855934f2bb9661f88ca625bd244, 
but as he states on that page, "The tricky part is how to setup the initial
state in a DistributedMapCache in order to pass the first incoming
FlowFile."

I also wondered if a Property could be added to the ConnectionDTO to allow
the backpressure queue to be applied to the entire object that the connection
is pointing to, in this case as Process Group. Since the Connection knows
the 'Within Group' of both sides of the connection, it could identify the
Process Group, and identify if there are any flowfiles in there. Though,
this could be problematic for a Process Group with multiple Input Ports, or
the code to identify the paths from the Input Port to any Output Ports to
determine the back pressure information. I'm not sure if this type of
suggestion conforms to the NiFi paradigm of flow processing and doing 1 thing
well.

I'm going to try a DistributedCacheService, along the lines of Wait/Notify
that will be able to use the not-found of the FetchCache, and then use
RouteOnAttribute based on that value to determine if we can proceed or 
go to a Wait processor, and then use PutCache at the end of the Process Group
(or groups of Processors, even) and possibly a Notify, and see how that might
get me forward.

Thanks,
John

--------------------------------------------
On Thu, 10/11/18, Lee Laim (leelaim) <[email protected]> wrote:

 Subject: RE: [EXT] Back Pressure on Process Group
 To: "[email protected]" <[email protected]>, "John McGinn" 
<[email protected]>
 Date: Thursday, October 11, 2018, 3:49 PM
 
 Hi John,  
 
 
 You can send the initiating
 flowfile into the Process Group and simultaneously send a
 duplicate flowfile around the process group into a
 MergeContent processor.  This duplicate will act as a back
 pressure "latch".
 
 Upon successful exit of the process group,
 Merge Content with a strict correlation strategy will clear
 the back pressure latch, allowing the next flowfile into the
 group.
 
 Personally, I think
 Wait/Notify is the more elegant solution, but have
 successfully used the back pressure latch before Wait/Notify
 was readily available.   
  
 
 This thread might offer some
 additional insight: 
http://apache-nifi.1125220.n5.nabble.com/Having-a-processor-wait-for-all-inputs-td15614.html
 
 
 Thanks,
 Lee
 
 
 -----Original Message-----
 From: John McGinn [mailto:[email protected]]
 
 Sent: Thursday, October 11, 2018 12:17
 PM
 To: [email protected]
 Subject: [EXT] Back Pressure on Process
 Group
 
 I've been going
 through mailing list archives, and looking at blog posts
 around Wait/Notify, and these don't seem to be the
 solution for my use case.
 
 My basic use case is as follows. I have 4 DB
 tables, 3 of which are id/name pairs (office name, city,
 state), and the 4th table joins the 3 ids together to a new
 id which is used elsewhere in the database.
 
 Using NiFi to injest data from
 a different database system, we have to verify if that
 office is active, and if it isn't active or
 non-existent, create a new record, as well as any of the
 other 3 tables necessary.
 
 The first step, then, is to join the 4 tables
 together to search for the name fields, and if the join
 comes back with a row, use that top level id as an
 attribute. No problem, works fine. (FetchDatabaseTable ->
 AvroToJson -> EvaluateJsonPath, etc.) If the join comes
 back empty, I need to insert rows for the 3 pieces and then
 join them together. Ideally, this would be a flow of 3
 PutSqls, then a connection back to the top level search of
 the database. (Currently I'm using a modified custom
 processor, LookupAttributeFromSQL, that Brett Ryan did in
 January 18th, before he worked on a SQLLookupService.)
 
 The problem is that I could
 have 2 records coming in with the same pieces of
 information, and because it's flow based, the check for
 the 4 table join will come up empty on the second record
 before the first record is done creating the 4 table
 records. I've investigated the Wait/Notify pattern, but
 the odd part for me is that you need to have a separate
 "initialization" of the Wait/Notify release signal
 indicator 
(https://gist.github.com/ijokarumawak/9e1a4855934f2bb9661f88ca625bd244)
 and that seems "hack-ish" to me.
 
 With all of that said, I was
 curious if there was a way to have a back pressure value of
 1 into the Process Group, so that if there is a flow file
 anywhere in the Process Group, the flow file is unable to
 enter that Process Group? That way, the creation of the 4
 records could be inside a process group, and no other
 flowfile can enter until that first flowfile has exited. 
 
 Thanks for any insight,
 John
 

Reply via email to