Thanks a lot for this. How did you configure your nifi proprieties file. I want to make sure that I am not missing anything. Also the cert did you needed to add the SAN?
On Wed, Apr 23, 2025 at 2:38 AM Jens M. Kofoed <jmkofoed....@gmail.com> wrote: > Hi Stephane > > With my following flow I'm only interested in the amount of connections > witch has queues or backpressure enabled from 2 levels of Process groups > and down. The top level of the canvas is named NiFi Flow, so I counts the > amount of connections from and blow "NiFi Flow / Sub Process Group A / Sub > Procces Group B". We have another Monitoring system to collect and report > issues with many systems. This system has the possibility to create > webhooks where other systems can report into. For each monitoring point > this system creates a uniq tokens. So in the end of my NiFi flow, I reports > the amount of connections with issues per flowGroup, to this other > monitoring system. > > What I have done is the following. > > 1. Created a Process group at top level named "NiFi monitoring" > 2. Inside this Process Group, create an "Input Port" for Remote > Connections (site-to-site) > 1. If your setup is running secure, you will have to manage access > policies for the input port, to allow Client's to connect to it > 3. From NiFi's main menu select "Crontroller Settings" > 1. Select "Reporting Tasks" tab > 2. Add a "SiteToSiteStatusReportingTask" > 3. Define Destinationen URL to your server > 4. Define Inpurt Port Name from #2 above > 4. In your "NiFi Monitoring" process group I have the following flow > 1. Input port - for remote connections > 2. PartitionRecord > 1. Splitting records based on component Type > 2. Dyniamic Property: record.componentType = /componentType > 3. RouteOnAttribute > 1. Routing different component types to different subflows > 2. You want "Connection" types > 4. Route "Connection" type record to a "QueryRecord" > 1. This will add percents for queues for each record > 2. You will have to create a JsonTreeReader and a > JsonRecordSetWriter > 3. Add ad dynamic property named: "records" with the following > value: > SELECT *, (queuedCount*100/backPressureObjectThreshold) AS > queueCountProcent, (queuedBytes*100/backPressureBytesThreshold) AS > queuedBytesProcent FROM FLOWFILE > 5. Add an "UpdateRecord" to add fields for "parent paths" for witch > process group the connections belongs to > 1. In each record there are a field named "parentPath" which has > values like this: "NiFi Flow / Process Group A / Process Group B / > ect." > 2. In my case I'm interested in errors at level 2 and 3. If a > flow is in a deeper level it should just be reported as level 3. > Therefore > I add parentPath 2 & 3 > 3. Dynamic Property: /parentPath2 = substringBefore( > substringAfter( /parentPath, ' / ), ' / ') > 4. Dynamic Property: /parentPath3 = substringBefore( > substringAfter( substringAfter( /parentPath, ' / ) , ' / ') , ' / ') > 6. Add "QueryRecord" - Create counts for flowgroups > 1. Dynamic Property: "counts" with the following value > select parentPath 2 || '-' || parentPath3 AS flowGroup, > actorHostname, > count(case when queueCountProcent > 1 then 1 else null > end) as count_queueCountProcent, > count(case when queuedBytesProcent > 1 then 1 else > null end) as count_queuedBytesProcent, > count(case when isBackPressureEnabled = 'true' then 1 > else null end) as count_isBackPressureEnabled > FROM FLOWFILE > WHERE componentName <> 'ignore' > GROUP BY parentPath2, parentPath3, actorHostname > 7. Add a "PartitionRecord" > 1. Dyniamic property: flowGroup = /flowGroup > 8. Add an "EvaluateJsonPath" > 1. Dynamic property: > actorHostnam = $[0].actorhostname > count.isBackPressureEnabled = $[0].count.isBackPressureEnabled > count.queueCountProcent = $[0].count. queueCountProcent > count.queuedBytesProcent = $[0].count. queuedBytesProcent > flowgroup = $[0].flowgroup > 9. LookupAttribute - Get API token for each flowgroup > 1. Dynamic property: token = $(flowgroup) > 10. RouteOnAttribute > 1. Dynamic property: No Token = ${token.equels('null')} > 2. Auto terminate "No Token" > 11. UpdateAttribute > 1. Generate a text for our other monitoring system > 12. InvokeHTTP > 1. POST a message to our other monitoring system, with the text > generated above and to an url including the token from step 9. > > > This is my monitoring flow > > Kind regards > Jens M. Kofoed > > > Den man. 21. apr. 2025 kl. 23.04 skrev stephane kouajip < > stephanekoua...@gmail.com>: > >> Hi Jens, >> >> Hope you had a great easter break, >> Please when you have a chance could you please share your configurations? >> I also believe there is an issue with the cert but i can’t figure it out. >> >> Regards, >> Stephane >> >> On Sat, Apr 12, 2025 at 2:45 AM Jens M. Kofoed <jmkofoed....@gmail.com> >> wrote: >> >>> I would love to share. But I’m currently not at work and don’t have >>> access to the system at the moment. So after the easter holidays I will >>> look at it. >>> Kind regards >>> Jens >>> >>> Den 11. apr. 2025 kl. 15.45 skrev stephane kouajip < >>> stephanekoua...@gmail.com>: >>> >>> >>> Hi Jens, >>> >>> How did you configured the site2sitereporting service ? >>> I tried that approach but my site2site never worked, it was failing >>> saying >>> >>> <A4C8F9E1-2108-481F-9354-94811E545247.jpeg> >>> >>> Could you please share how you configure your nifi proprieties? >>> >>> Regards, >>> Stephane >>> >>> On Fri, Apr 11, 2025 at 3:23 AM Jens M. Kofoed <jmkofoed....@gmail.com> >>> wrote: >>> >>>> Dear Stephane >>>> >>>> I’m using a site2sitereporting service. Created a flow where I filter >>>> on connections, and calculate how many procent each connection is full. >>>> Both for flow files and for bytes (amount in queue vs. Max back pressure >>>> allowed). Later I filter connections with queues reach a threshold and send >>>> info to another system, which give me some alarms. I also create some >>>> parent group names, so I know in which group there might be a problem. >>>> >>>> Kind regards >>>> Jens >>>> >>>> Den 9. apr. 2025 kl. 20.35 skrev stephane kouajip < >>>> stephanekoua...@gmail.com>: >>>> >>>> >>>> >>>> I will take a look. >>>> >>>> Thanks >>>> >>>> On Wed, Apr 9, 2025 at 1:25 PM Daniel Chaffelson <chaffel...@gmail.com> >>>> wrote: >>>> >>>>> Hi Stephane, >>>>> I have code in NiPyAPI[1] for purging all connections in a process >>>>> group, which was created to easily reset test flows. >>>>> I expect you could use it as an example to instead inspect connection >>>>> status to determine capacity issues using some scheduled fetch and analyse >>>>> process. >>>>> >>>>> I am not aware of a native method inside NiFi for your requirement, >>>>> but other wiser minds may offer further insight. >>>>> >>>>> [1] >>>>> https://nipyapi.readthedocs.io/en/latest/nipyapi-docs/nipyapi.html#nipyapi.canvas.list_all_connections >>>>> >>>>> >>>>> On Wed, Apr 9, 2025 at 1:29 PM stephane kouajip < >>>>> stephanekoua...@gmail.com> wrote: >>>>> >>>>>> Hi NiFi Community, >>>>>> >>>>>> I'm working with Apache NiFi version 1.27.0, and I need help figuring >>>>>> out how to report or monitor when a queue is full. I want to be able to >>>>>> track and get alerts if any queue in my NiFi instance reaches its full >>>>>> capacity. >>>>>> >>>>>> I’ve looked through the NiFi documentation and community discussions, >>>>>> but I haven't found a clear approach to monitor the queues effectively. >>>>>> Can >>>>>> anyone provide advice or best practices for reporting on a full queue? >>>>>> Specifically, I'm looking for a way to: >>>>>> >>>>>> - >>>>>> >>>>>> Monitor when a queue reaches its maximum capacity. >>>>>> - >>>>>> >>>>>> Set up alerts or notifications based on queue status. >>>>>> - >>>>>> >>>>>> Log or report this event for monitoring or troubleshooting >>>>>> purposes. >>>>>> >>>>>> Any help or pointers would be greatly appreciated. Thanks in advance! >>>>>> >>>>>