Thanks a lot for this.
How did you configure your nifi proprieties file.
I want to make sure that I am not missing anything.
Also the cert did you needed to add the SAN?

On Wed, Apr 23, 2025 at 2:38 AM Jens M. Kofoed <jmkofoed....@gmail.com>
wrote:

> Hi Stephane
>
> With my following flow I'm only interested in the amount of connections
> witch has queues or backpressure enabled from 2 levels of Process groups
> and down.  The top level of the canvas is named NiFi Flow, so I counts the
> amount of connections from and blow "NiFi Flow / Sub Process Group A / Sub
> Procces Group B". We have another Monitoring system to collect and report
> issues with many systems. This system has the possibility to create
> webhooks where other systems can report into. For each monitoring point
> this system creates a uniq tokens. So in the end of my NiFi flow, I reports
> the amount of connections with issues per flowGroup, to this other
> monitoring system.
>
> What I have done is the following.
>
>    1. Created a Process group at top level named "NiFi monitoring"
>    2. Inside this Process Group, create an "Input Port" for Remote
>    Connections (site-to-site)
>       1. If your setup is running secure, you will have to manage access
>       policies for the input port, to allow Client's to connect to it
>    3. From NiFi's main menu select "Crontroller Settings"
>       1. Select "Reporting Tasks" tab
>       2. Add a "SiteToSiteStatusReportingTask"
>       3. Define Destinationen URL to your server
>       4. Define Inpurt Port Name from #2 above
>    4. In your "NiFi Monitoring" process group I have the following flow
>       1. Input port - for remote connections
>       2. PartitionRecord
>          1. Splitting records based on component Type
>          2. Dyniamic Property: record.componentType = /componentType
>       3. RouteOnAttribute
>          1. Routing different component types to different subflows
>          2. You want "Connection" types
>       4. Route "Connection" type record to a "QueryRecord"
>          1. This will add percents for queues for each record
>          2. You will have to create a JsonTreeReader and a
>          JsonRecordSetWriter
>          3. Add ad dynamic property named: "records" with the following
>          value:
>          SELECT *, (queuedCount*100/backPressureObjectThreshold) AS
>          queueCountProcent, (queuedBytes*100/backPressureBytesThreshold) AS
>          queuedBytesProcent FROM FLOWFILE
>       5. Add an "UpdateRecord" to add fields for "parent paths" for witch
>       process group the connections belongs to
>          1. In each record there are a field named "parentPath" which has
>          values like this: "NiFi Flow / Process Group A / Process Group B / 
> ect."
>          2. In my case I'm interested in errors at level 2 and 3. If a
>          flow is in a deeper level it should just be reported as level 3. 
> Therefore
>          I add parentPath 2 & 3
>          3. Dynamic Property: /parentPath2 = substringBefore(
>          substringAfter( /parentPath, ' / ), ' / ')
>          4. Dynamic Property: /parentPath3 = substringBefore(
>          substringAfter( substringAfter( /parentPath, ' / ) , ' / ') , ' / ')
>       6. Add "QueryRecord" - Create counts for flowgroups
>          1. Dynamic Property: "counts" with the following value
>          select parentPath 2 || '-' || parentPath3 AS flowGroup,
>                    actorHostname,
>                    count(case when queueCountProcent > 1 then 1 else null
>          end) as count_queueCountProcent,
>                    count(case when queuedBytesProcent   > 1 then 1 else
>          null end) as count_queuedBytesProcent,
>                    count(case when isBackPressureEnabled = 'true' then 1
>          else null end) as count_isBackPressureEnabled
>          FROM FLOWFILE
>          WHERE componentName <> 'ignore'
>          GROUP BY parentPath2, parentPath3, actorHostname
>       7. Add a "PartitionRecord"
>          1. Dyniamic property: flowGroup = /flowGroup
>       8. Add an "EvaluateJsonPath"
>          1. Dynamic property:
>          actorHostnam = $[0].actorhostname
>          count.isBackPressureEnabled = $[0].count.isBackPressureEnabled
>          count.queueCountProcent   = $[0].count. queueCountProcent
>          count.queuedBytesProcent     = $[0].count. queuedBytesProcent
>          flowgroup = $[0].flowgroup
>       9. LookupAttribute - Get API token for each flowgroup
>          1. Dynamic property: token = $(flowgroup)
>       10. RouteOnAttribute
>          1. Dynamic property: No Token = ${token.equels('null')}
>          2. Auto terminate "No Token"
>       11. UpdateAttribute
>          1. Generate a text for our other monitoring system
>       12. InvokeHTTP
>          1. POST a message to our other monitoring system, with the text
>          generated above and to an url including the token from step 9.
>
>
> This is my monitoring flow
>
> Kind regards
> Jens M. Kofoed
>
>
> Den man. 21. apr. 2025 kl. 23.04 skrev stephane kouajip <
> stephanekoua...@gmail.com>:
>
>> Hi Jens,
>>
>> Hope you had a great easter break,
>> Please when you have a chance could you please share your configurations?
>> I also believe there is an issue with the cert but i can’t figure it out.
>>
>> Regards,
>> Stephane
>>
>> On Sat, Apr 12, 2025 at 2:45 AM Jens M. Kofoed <jmkofoed....@gmail.com>
>> wrote:
>>
>>> I would love to share. But I’m currently not at work and don’t have
>>> access to the system at the moment. So after the easter holidays I will
>>> look at it.
>>> Kind regards
>>> Jens
>>>
>>> Den 11. apr. 2025 kl. 15.45 skrev stephane kouajip <
>>> stephanekoua...@gmail.com>:
>>>
>>> 
>>> Hi Jens,
>>>
>>> How did you configured the site2sitereporting service ?
>>> I tried that approach but my site2site never worked, it was failing
>>> saying
>>>
>>> <A4C8F9E1-2108-481F-9354-94811E545247.jpeg>
>>>
>>> Could you please share how you configure your nifi proprieties?
>>>
>>> Regards,
>>> Stephane
>>>
>>> On Fri, Apr 11, 2025 at 3:23 AM Jens M. Kofoed <jmkofoed....@gmail.com>
>>> wrote:
>>>
>>>> Dear Stephane
>>>>
>>>> I’m using a site2sitereporting service. Created a flow where I filter
>>>> on connections, and calculate how many procent each connection is full.
>>>> Both for flow files and for bytes (amount in queue vs. Max back pressure
>>>> allowed). Later I filter connections with queues reach a threshold and send
>>>> info to another system, which give me some alarms. I also create some
>>>> parent group names, so I know in which group there might be a problem.
>>>>
>>>> Kind regards
>>>> Jens
>>>>
>>>> Den 9. apr. 2025 kl. 20.35 skrev stephane kouajip <
>>>> stephanekoua...@gmail.com>:
>>>>
>>>> 
>>>>
>>>> I will take a look.
>>>>
>>>> Thanks
>>>>
>>>> On Wed, Apr 9, 2025 at 1:25 PM Daniel Chaffelson <chaffel...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Stephane,
>>>>> I have code in NiPyAPI[1] for purging all connections in a process
>>>>> group, which was created to easily reset test flows.
>>>>> I expect you could use it as an example to instead inspect connection
>>>>> status to determine capacity issues using some scheduled fetch and analyse
>>>>> process.
>>>>>
>>>>> I am not aware of a native method inside NiFi for your requirement,
>>>>> but other wiser minds may offer further insight.
>>>>>
>>>>> [1]
>>>>> https://nipyapi.readthedocs.io/en/latest/nipyapi-docs/nipyapi.html#nipyapi.canvas.list_all_connections
>>>>>
>>>>>
>>>>> On Wed, Apr 9, 2025 at 1:29 PM stephane kouajip <
>>>>> stephanekoua...@gmail.com> wrote:
>>>>>
>>>>>> Hi NiFi Community,
>>>>>>
>>>>>> I'm working with Apache NiFi version 1.27.0, and I need help figuring
>>>>>> out how to report or monitor when a queue is full. I want to be able to
>>>>>> track and get alerts if any queue in my NiFi instance reaches its full
>>>>>> capacity.
>>>>>>
>>>>>> I’ve looked through the NiFi documentation and community discussions,
>>>>>> but I haven't found a clear approach to monitor the queues effectively. 
>>>>>> Can
>>>>>> anyone provide advice or best practices for reporting on a full queue?
>>>>>> Specifically, I'm looking for a way to:
>>>>>>
>>>>>>    -
>>>>>>
>>>>>>    Monitor when a queue reaches its maximum capacity.
>>>>>>    -
>>>>>>
>>>>>>    Set up alerts or notifications based on queue status.
>>>>>>    -
>>>>>>
>>>>>>    Log or report this event for monitoring or troubleshooting
>>>>>>    purposes.
>>>>>>
>>>>>> Any help or pointers would be greatly appreciated. Thanks in advance!
>>>>>>
>>>>>

Reply via email to