Hi Stephane

With my following flow I'm only interested in the amount of connections
witch has queues or backpressure enabled from 2 levels of Process groups
and down.  The top level of the canvas is named NiFi Flow, so I counts the
amount of connections from and blow "NiFi Flow / Sub Process Group A / Sub
Procces Group B". We have another Monitoring system to collect and report
issues with many systems. This system has the possibility to create
webhooks where other systems can report into. For each monitoring point
this system creates a uniq tokens. So in the end of my NiFi flow, I reports
the amount of connections with issues per flowGroup, to this other
monitoring system.

What I have done is the following.

   1. Created a Process group at top level named "NiFi monitoring"
   2. Inside this Process Group, create an "Input Port" for Remote
   Connections (site-to-site)
      1. If your setup is running secure, you will have to manage access
      policies for the input port, to allow Client's to connect to it
   3. From NiFi's main menu select "Crontroller Settings"
      1. Select "Reporting Tasks" tab
      2. Add a "SiteToSiteStatusReportingTask"
      3. Define Destinationen URL to your server
      4. Define Inpurt Port Name from #2 above
   4. In your "NiFi Monitoring" process group I have the following flow
      1. Input port - for remote connections
      2. PartitionRecord
         1. Splitting records based on component Type
         2. Dyniamic Property: record.componentType = /componentType
      3. RouteOnAttribute
         1. Routing different component types to different subflows
         2. You want "Connection" types
      4. Route "Connection" type record to a "QueryRecord"
         1. This will add percents for queues for each record
         2. You will have to create a JsonTreeReader and a
         JsonRecordSetWriter
         3. Add ad dynamic property named: "records" with the following
         value:
         SELECT *, (queuedCount*100/backPressureObjectThreshold) AS
         queueCountProcent, (queuedBytes*100/backPressureBytesThreshold) AS
         queuedBytesProcent FROM FLOWFILE
      5. Add an "UpdateRecord" to add fields for "parent paths" for witch
      process group the connections belongs to
         1. In each record there are a field named "parentPath" which has
         values like this: "NiFi Flow / Process Group A / Process
Group B / ect."
         2. In my case I'm interested in errors at level 2 and 3. If a flow
         is in a deeper level it should just be reported as level 3.
Therefore I add
         parentPath 2 & 3
         3. Dynamic Property: /parentPath2 = substringBefore(
         substringAfter( /parentPath, ' / ), ' / ')
         4. Dynamic Property: /parentPath3 = substringBefore(
         substringAfter( substringAfter( /parentPath, ' / ) , ' / ') , ' / ')
      6. Add "QueryRecord" - Create counts for flowgroups
         1. Dynamic Property: "counts" with the following value
         select parentPath 2 || '-' || parentPath3 AS flowGroup,
                   actorHostname,
                   count(case when queueCountProcent > 1 then 1 else null
         end) as count_queueCountProcent,
                   count(case when queuedBytesProcent   > 1 then 1 else
         null end) as count_queuedBytesProcent,
                   count(case when isBackPressureEnabled = 'true' then 1
         else null end) as count_isBackPressureEnabled
         FROM FLOWFILE
         WHERE componentName <> 'ignore'
         GROUP BY parentPath2, parentPath3, actorHostname
      7. Add a "PartitionRecord"
         1. Dyniamic property: flowGroup = /flowGroup
      8. Add an "EvaluateJsonPath"
         1. Dynamic property:
         actorHostnam = $[0].actorhostname
         count.isBackPressureEnabled = $[0].count.isBackPressureEnabled
         count.queueCountProcent   = $[0].count. queueCountProcent
         count.queuedBytesProcent     = $[0].count. queuedBytesProcent
         flowgroup = $[0].flowgroup
      9. LookupAttribute - Get API token for each flowgroup
         1. Dynamic property: token = $(flowgroup)
      10. RouteOnAttribute
         1. Dynamic property: No Token = ${token.equels('null')}
         2. Auto terminate "No Token"
      11. UpdateAttribute
         1. Generate a text for our other monitoring system
      12. InvokeHTTP
         1. POST a message to our other monitoring system, with the text
         generated above and to an url including the token from step 9.


This is my monitoring flow

Kind regards
Jens M. Kofoed


Den man. 21. apr. 2025 kl. 23.04 skrev stephane kouajip <
stephanekoua...@gmail.com>:

> Hi Jens,
>
> Hope you had a great easter break,
> Please when you have a chance could you please share your configurations?
> I also believe there is an issue with the cert but i can’t figure it out.
>
> Regards,
> Stephane
>
> On Sat, Apr 12, 2025 at 2:45 AM Jens M. Kofoed <jmkofoed....@gmail.com>
> wrote:
>
>> I would love to share. But I’m currently not at work and don’t have
>> access to the system at the moment. So after the easter holidays I will
>> look at it.
>> Kind regards
>> Jens
>>
>> Den 11. apr. 2025 kl. 15.45 skrev stephane kouajip <
>> stephanekoua...@gmail.com>:
>>
>> 
>> Hi Jens,
>>
>> How did you configured the site2sitereporting service ?
>> I tried that approach but my site2site never worked, it was failing
>> saying
>>
>> <A4C8F9E1-2108-481F-9354-94811E545247.jpeg>
>>
>> Could you please share how you configure your nifi proprieties?
>>
>> Regards,
>> Stephane
>>
>> On Fri, Apr 11, 2025 at 3:23 AM Jens M. Kofoed <jmkofoed....@gmail.com>
>> wrote:
>>
>>> Dear Stephane
>>>
>>> I’m using a site2sitereporting service. Created a flow where I filter on
>>> connections, and calculate how many procent each connection is full. Both
>>> for flow files and for bytes (amount in queue vs. Max back pressure
>>> allowed). Later I filter connections with queues reach a threshold and send
>>> info to another system, which give me some alarms. I also create some
>>> parent group names, so I know in which group there might be a problem.
>>>
>>> Kind regards
>>> Jens
>>>
>>> Den 9. apr. 2025 kl. 20.35 skrev stephane kouajip <
>>> stephanekoua...@gmail.com>:
>>>
>>> 
>>>
>>> I will take a look.
>>>
>>> Thanks
>>>
>>> On Wed, Apr 9, 2025 at 1:25 PM Daniel Chaffelson <chaffel...@gmail.com>
>>> wrote:
>>>
>>>> Hi Stephane,
>>>> I have code in NiPyAPI[1] for purging all connections in a process
>>>> group, which was created to easily reset test flows.
>>>> I expect you could use it as an example to instead inspect connection
>>>> status to determine capacity issues using some scheduled fetch and analyse
>>>> process.
>>>>
>>>> I am not aware of a native method inside NiFi for your requirement, but
>>>> other wiser minds may offer further insight.
>>>>
>>>> [1]
>>>> https://nipyapi.readthedocs.io/en/latest/nipyapi-docs/nipyapi.html#nipyapi.canvas.list_all_connections
>>>>
>>>>
>>>> On Wed, Apr 9, 2025 at 1:29 PM stephane kouajip <
>>>> stephanekoua...@gmail.com> wrote:
>>>>
>>>>> Hi NiFi Community,
>>>>>
>>>>> I'm working with Apache NiFi version 1.27.0, and I need help figuring
>>>>> out how to report or monitor when a queue is full. I want to be able to
>>>>> track and get alerts if any queue in my NiFi instance reaches its full
>>>>> capacity.
>>>>>
>>>>> I’ve looked through the NiFi documentation and community discussions,
>>>>> but I haven't found a clear approach to monitor the queues effectively. 
>>>>> Can
>>>>> anyone provide advice or best practices for reporting on a full queue?
>>>>> Specifically, I'm looking for a way to:
>>>>>
>>>>>    -
>>>>>
>>>>>    Monitor when a queue reaches its maximum capacity.
>>>>>    -
>>>>>
>>>>>    Set up alerts or notifications based on queue status.
>>>>>    -
>>>>>
>>>>>    Log or report this event for monitoring or troubleshooting
>>>>>    purposes.
>>>>>
>>>>> Any help or pointers would be greatly appreciated. Thanks in advance!
>>>>>
>>>>

Reply via email to