Re: [E] Lagging worker nodes

2021-01-29 Thread Zilvinas Saltys
That makes a lot of sense :) Thanks!

On Fri, Jan 29, 2021 at 3:33 PM Mark Payne  wrote:

> Zilvinas,
>
> That’s fantastic! I can’t say for certain without delving in a whole lot
> deeper, but I would guess that the disk reads have dropped significantly
> because disk caching is now much better utilized. Before, you were bringing
> in a bunch of data and writing to disk. That would evict “old” data from
> the cache. When you then went to read the FlowFile content, it would have
> to go to disk. Now, you’re not holding as much data in the content
> repository because the backpressure is preventing you from bringing in huge
> amounts until you’ve dealt with what you have. As a result, you don’t evict
> the content from the OS disk cache, so when you go to read, it’s just
> coming from the disk cache. So that would mean way less disk reading and
> way better throughput. If I recall correctly, you’re running on AWS with a
> 200 GB EBS volume for the content repo (though I could be remembering a
> different thread). That’s not a particularly fast volume, so it makes sense
> that disk caching would give you dramatically better performance.
>
> Thanks
> -Mark
>
>
>
> On Jan 29, 2021, at 10:16 AM, Zilvinas Saltys <
> zilvinas.sal...@verizonmedia.com> wrote:
>
> Thanks Mark.
>
> That explains it. It makes sense as I know thread count is also per node.
> I know from working with my team that it's counterintuitive to people. I
> set the queue to max 1000 items but then I see 19,000 in a 25 node cluster.
> It can be confusing. It's kinda obvious now that you've pointed it out
> though. Perhaps expanding the tooltips to explain that the settings are per
> node could be helpful.
>
> What I find very interesting if that after your suggested changes Disk
> READ IO dropped on all nodes while performance improved. Please see the
> attached screenshot. I would guess that when nodes have low amounts of
> items in the queue it somehow manages to avoid some of the DISK IO between
> processors. Is that possible?
>
> Thanks again, you've saved our NIFI ;)
>
> On Fri, Jan 29, 2021 at 2:20 PM Mark Payne  wrote:
>
>> Zilvinas,
>>
>> The backpressure is per-node, not cluster-wide. So if you set a
>> backpressure of 100, that means that each node will allow up to 100
>> FlowFiles into that connection. In fact, almost any of these types of
>> settings in NiFi are per-node settings. The number of concurrent tasks, the
>> thread pool size, backpressure, sizes of the repositories, etc. Generally,
>> this works very nicely, because it allows you to ensure that each node is
>> bringing in only the work that it can handle.
>>
>> Thanks
>> -Mark
>>
>> On Jan 29, 2021, at 8:57 AM, Zilvinas Saltys <
>> zilvinas.sal...@verizonmedia.com> wrote:
>>
>> Mark,
>>
>> Thank you for your detailed reply. Your suggestions are very helpful and
>> at least for now it seems we've stopped accumulating large queues on some
>> of the nodes. I do have additional questions about back pressure if you
>> don't mind. I've watched all your videos, thanks for sharing them with me.
>> Perhaps the next one could be about back pressure?
>>
>> The way I understand it is that back pressure is configured for the whole
>> connection as an aggregate for all the nodes in the cluster. Meaning if we
>> have a queue with 100 items and 10 nodes in the cluster then in the perfect
>> scenario each node would have 10 items each. My question is how does it
>> actually work internally? If a queue is configured at 100 items max and it
>> now has gone down to 99 and there's now room for 1 more and there are 10
>> flowfiles upstream available on every host - which host on the cluster will
>> get to fill that 1 slot? The only way I could think of this working is that
>> when I define a backpressure of 100 that internally nifi sets every node's
>> BP to 100/node_count. Is this how it works internally?
>>
>> Thanks for your help,
>> Z.
>>
>> On Thu, Jan 28, 2021 at 8:48 PM Mark Payne  wrote:
>>
>>> Zilvinas,
>>>
>>> That is accurate - when a connection is load balanced, the data is
>>> pushed to a particular node based on the selected algorithm. It is not
>>> continually rebalanced.
>>>
>>> So for a flow like this, my recommendation would be:
>>>
>>> 1) Set the backpressure threshold from FetchS3Object -> PublishKafka to
>>> something “small.” Probably 1 or 2x the number of concurrent tasks that you
>>> have for PublishKafka. That means that you’ll queue up little data there.
>>> That will cause the data to instead build up on the connection between
>>> EvaluateJsonPath -> FetchS3Object.
>>>
>>> 2) Set the backpressure threshold from EvaluateJsonPath -> FetchS3Object
>>> to something small also. Maybe 2x-4x the number of nodes in the cluster.
>>> This will result in data not queuing up here. As a result, as the size of
>>> this queue gets smaller on one node, the data will begin to flow in and get
>>> distributed to one of the nodes in the cluster.
>>>
>>> By keeping these queues small, 

Re: [E] Lagging worker nodes

2021-01-29 Thread Mark Payne
Zilvinas,

That’s fantastic! I can’t say for certain without delving in a whole lot 
deeper, but I would guess that the disk reads have dropped significantly 
because disk caching is now much better utilized. Before, you were bringing in 
a bunch of data and writing to disk. That would evict “old” data from the 
cache. When you then went to read the FlowFile content, it would have to go to 
disk. Now, you’re not holding as much data in the content repository because 
the backpressure is preventing you from bringing in huge amounts until you’ve 
dealt with what you have. As a result, you don’t evict the content from the OS 
disk cache, so when you go to read, it’s just coming from the disk cache. So 
that would mean way less disk reading and way better throughput. If I recall 
correctly, you’re running on AWS with a 200 GB EBS volume for the content repo 
(though I could be remembering a different thread). That’s not a particularly 
fast volume, so it makes sense that disk caching would give you dramatically 
better performance.

Thanks
-Mark



On Jan 29, 2021, at 10:16 AM, Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> 
wrote:

Thanks Mark.

That explains it. It makes sense as I know thread count is also per node. I 
know from working with my team that it's counterintuitive to people. I set the 
queue to max 1000 items but then I see 19,000 in a 25 node cluster. It can be 
confusing. It's kinda obvious now that you've pointed it out though. Perhaps 
expanding the tooltips to explain that the settings are per node could be 
helpful.

What I find very interesting if that after your suggested changes Disk READ IO 
dropped on all nodes while performance improved. Please see the attached 
screenshot. I would guess that when nodes have low amounts of items in the 
queue it somehow manages to avoid some of the DISK IO between processors. Is 
that possible?

Thanks again, you've saved our NIFI ;)

On Fri, Jan 29, 2021 at 2:20 PM Mark Payne 
mailto:marka...@hotmail.com>> wrote:
Zilvinas,

The backpressure is per-node, not cluster-wide. So if you set a backpressure of 
100, that means that each node will allow up to 100 FlowFiles into that 
connection. In fact, almost any of these types of settings in NiFi are per-node 
settings. The number of concurrent tasks, the thread pool size, backpressure, 
sizes of the repositories, etc. Generally, this works very nicely, because it 
allows you to ensure that each node is bringing in only the work that it can 
handle.

Thanks
-Mark

On Jan 29, 2021, at 8:57 AM, Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> 
wrote:

Mark,

Thank you for your detailed reply. Your suggestions are very helpful and at 
least for now it seems we've stopped accumulating large queues on some of the 
nodes. I do have additional questions about back pressure if you don't mind. 
I've watched all your videos, thanks for sharing them with me. Perhaps the next 
one could be about back pressure?

The way I understand it is that back pressure is configured for the whole 
connection as an aggregate for all the nodes in the cluster. Meaning if we have 
a queue with 100 items and 10 nodes in the cluster then in the perfect scenario 
each node would have 10 items each. My question is how does it actually work 
internally? If a queue is configured at 100 items max and it now has gone down 
to 99 and there's now room for 1 more and there are 10 flowfiles upstream 
available on every host - which host on the cluster will get to fill that 1 
slot? The only way I could think of this working is that when I define a 
backpressure of 100 that internally nifi sets every node's BP to 
100/node_count. Is this how it works internally?

Thanks for your help,
Z.

On Thu, Jan 28, 2021 at 8:48 PM Mark Payne 
mailto:marka...@hotmail.com>> wrote:
Zilvinas,

That is accurate - when a connection is load balanced, the data is pushed to a 
particular node based on the selected algorithm. It is not continually 
rebalanced.

So for a flow like this, my recommendation would be:

1) Set the backpressure threshold from FetchS3Object -> PublishKafka to 
something “small.” Probably 1 or 2x the number of concurrent tasks that you 
have for PublishKafka. That means that you’ll queue up little data there. That 
will cause the data to instead build up on the connection between 
EvaluateJsonPath -> FetchS3Object.

2) Set the backpressure threshold from EvaluateJsonPath -> FetchS3Object to 
something small also. Maybe 2x-4x the number of nodes in the cluster. This will 
result in data not queuing up here. As a result, as the size of this queue gets 
smaller on one node, the data will begin to flow in and get distributed to one 
of the nodes in the cluster.

By keeping these queues small, essentially this will cause the “load balanced” 
data to stay small. As the faster nodes work off their queue, it will allow 
more data to flow in and be load balanced. The nodes that are not performing 
well will still have 

Re: [E] Lagging worker nodes

2021-01-29 Thread Mark Payne
Zilvinas,

The backpressure is per-node, not cluster-wide. So if you set a backpressure of 
100, that means that each node will allow up to 100 FlowFiles into that 
connection. In fact, almost any of these types of settings in NiFi are per-node 
settings. The number of concurrent tasks, the thread pool size, backpressure, 
sizes of the repositories, etc. Generally, this works very nicely, because it 
allows you to ensure that each node is bringing in only the work that it can 
handle.

Thanks
-Mark

On Jan 29, 2021, at 8:57 AM, Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> 
wrote:

Mark,

Thank you for your detailed reply. Your suggestions are very helpful and at 
least for now it seems we've stopped accumulating large queues on some of the 
nodes. I do have additional questions about back pressure if you don't mind. 
I've watched all your videos, thanks for sharing them with me. Perhaps the next 
one could be about back pressure?

The way I understand it is that back pressure is configured for the whole 
connection as an aggregate for all the nodes in the cluster. Meaning if we have 
a queue with 100 items and 10 nodes in the cluster then in the perfect scenario 
each node would have 10 items each. My question is how does it actually work 
internally? If a queue is configured at 100 items max and it now has gone down 
to 99 and there's now room for 1 more and there are 10 flowfiles upstream 
available on every host - which host on the cluster will get to fill that 1 
slot? The only way I could think of this working is that when I define a 
backpressure of 100 that internally nifi sets every node's BP to 
100/node_count. Is this how it works internally?

Thanks for your help,
Z.

On Thu, Jan 28, 2021 at 8:48 PM Mark Payne 
mailto:marka...@hotmail.com>> wrote:
Zilvinas,

That is accurate - when a connection is load balanced, the data is pushed to a 
particular node based on the selected algorithm. It is not continually 
rebalanced.

So for a flow like this, my recommendation would be:

1) Set the backpressure threshold from FetchS3Object -> PublishKafka to 
something “small.” Probably 1 or 2x the number of concurrent tasks that you 
have for PublishKafka. That means that you’ll queue up little data there. That 
will cause the data to instead build up on the connection between 
EvaluateJsonPath -> FetchS3Object.

2) Set the backpressure threshold from EvaluateJsonPath -> FetchS3Object to 
something small also. Maybe 2x-4x the number of nodes in the cluster. This will 
result in data not queuing up here. As a result, as the size of this queue gets 
smaller on one node, the data will begin to flow in and get distributed to one 
of the nodes in the cluster.

By keeping these queues small, essentially this will cause the “load balanced” 
data to stay small. As the faster nodes work off their queue, it will allow 
more data to flow in and be load balanced. The nodes that are not performing 
well will still have backpressure applied so they won’t get much of the data as 
it flows in.

As your flow is right now, there’s no backpressure being hit in the 
load-balanced queue. As a result, data streams in as fast as it can and gets 
distributed across the cluster. And the nodes that can’t keep up already have a 
huge backlog of SQS messages. So making this adjustment will help to distribute 
the data as to the nodes as they become able to handle it.

Thanks
-Mark

On Jan 28, 2021, at 3:27 PM, Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> 
wrote:

My other issue is that the balancing is not rebalancing the queue? Perhaps I 
misunderstand how balancing should work and it only balances round robin new 
incoming files? I can easily manually rebalance by disabling balancing and 
enabling it again but after a while it gets back to the same situation where 
some nodes are getting worse and worse delayed more and more and some remain 
fine.

On Thu, Jan 28, 2021 at 8:22 PM Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> 
wrote:
Hi Joe,

Yes it is the same issue. We have used your advice and reduced the amount of 
threads on our large processors: fetch/compress/publish to a minimum and then 
increased gradually to 4 until the processing rate became acceptable (about 
2000 files per 5 min). This is a cluster of 25 nodes of 36 cores each.

On Thu, Jan 28, 2021 at 8:19 PM Joe Witt 
mailto:joe.w...@gmail.com>> wrote:
I'm assuming also this is the same thing Maksym was asking about yesterday.  
Let's try to keep the thread together as this gets discussed.

On Thu, Jan 28, 2021 at 1:10 PM Pierre Villard 
mailto:pierre.villard...@gmail.com>> wrote:
Hi Zilvinas,

I'm afraid we would need more details to help you out here.

My first question by quickly looking at the graph would be: there is a host 
(green line) where the number of queued flow files is more or less constantly 
growing. Where in the flow are the flow files accumulating for this node? What 
processor is creating back pressure? Do we 

Re: [E] Lagging worker nodes

2021-01-29 Thread Zilvinas Saltys
Mark,

Thank you for your detailed reply. Your suggestions are very helpful and at
least for now it seems we've stopped accumulating large queues on some of
the nodes. I do have additional questions about back pressure if you don't
mind. I've watched all your videos, thanks for sharing them with me.
Perhaps the next one could be about back pressure?

The way I understand it is that back pressure is configured for the whole
connection as an aggregate for all the nodes in the cluster. Meaning if we
have a queue with 100 items and 10 nodes in the cluster then in the perfect
scenario each node would have 10 items each. My question is how does it
actually work internally? If a queue is configured at 100 items max and it
now has gone down to 99 and there's now room for 1 more and there are 10
flowfiles upstream available on every host - which host on the cluster will
get to fill that 1 slot? The only way I could think of this working is that
when I define a backpressure of 100 that internally nifi sets every node's
BP to 100/node_count. Is this how it works internally?

Thanks for your help,
Z.

On Thu, Jan 28, 2021 at 8:48 PM Mark Payne  wrote:

> Zilvinas,
>
> That is accurate - when a connection is load balanced, the data is pushed
> to a particular node based on the selected algorithm. It is not continually
> rebalanced.
>
> So for a flow like this, my recommendation would be:
>
> 1) Set the backpressure threshold from FetchS3Object -> PublishKafka to
> something “small.” Probably 1 or 2x the number of concurrent tasks that you
> have for PublishKafka. That means that you’ll queue up little data there.
> That will cause the data to instead build up on the connection between
> EvaluateJsonPath -> FetchS3Object.
>
> 2) Set the backpressure threshold from EvaluateJsonPath -> FetchS3Object
> to something small also. Maybe 2x-4x the number of nodes in the cluster.
> This will result in data not queuing up here. As a result, as the size of
> this queue gets smaller on one node, the data will begin to flow in and get
> distributed to one of the nodes in the cluster.
>
> By keeping these queues small, essentially this will cause the “load
> balanced” data to stay small. As the faster nodes work off their queue, it
> will allow more data to flow in and be load balanced. The nodes that are
> not performing well will still have backpressure applied so they won’t get
> much of the data as it flows in.
>
> As your flow is right now, there’s no backpressure being hit in the
> load-balanced queue. As a result, data streams in as fast as it can and
> gets distributed across the cluster. And the nodes that can’t keep up
> already have a huge backlog of SQS messages. So making this adjustment will
> help to distribute the data as to the nodes as they become able to handle
> it.
>
> Thanks
> -Mark
>
> On Jan 28, 2021, at 3:27 PM, Zilvinas Saltys <
> zilvinas.sal...@verizonmedia.com> wrote:
>
> My other issue is that the balancing is not rebalancing the queue? Perhaps
> I misunderstand how balancing should work and it only balances round robin
> new incoming files? I can easily manually rebalance by disabling balancing
> and enabling it again but after a while it gets back to the same situation
> where some nodes are getting worse and worse delayed more and more and some
> remain fine.
>
> On Thu, Jan 28, 2021 at 8:22 PM Zilvinas Saltys <
> zilvinas.sal...@verizonmedia.com> wrote:
>
>> Hi Joe,
>>
>> Yes it is the same issue. We have used your advice and reduced the amount
>> of threads on our large processors: fetch/compress/publish to a minimum and
>> then increased gradually to 4 until the processing rate became acceptable
>> (about 2000 files per 5 min). This is a cluster of 25 nodes of 36 cores
>> each.
>>
>> On Thu, Jan 28, 2021 at 8:19 PM Joe Witt  wrote:
>>
>>> I'm assuming also this is the same thing Maksym was asking about
>>> yesterday.  Let's try to keep the thread together as this gets discussed.
>>>
>>> On Thu, Jan 28, 2021 at 1:10 PM Pierre Villard <
>>> pierre.villard...@gmail.com> wrote:
>>>
 Hi Zilvinas,

 I'm afraid we would need more details to help you out here.

 My first question by quickly looking at the graph would be: there is a
 host (green line) where the number of queued flow files is more or less
 constantly growing. Where in the flow are the flow files accumulating for
 this node? What processor is creating back pressure? Do we have anything in
 the log for this node around the time where flow files start accumulating?

 Thanks,
 Pierre

 Le ven. 29 janv. 2021 à 00:02, Zilvinas Saltys <
 zilvinas.sal...@verizonmedia.com> a écrit :

> Hi,
>
> We run a 25 node Nifi cluster on version 1.12. We're processing about
> 2000 files per 5 mins where each file is from 100 to 500 megabytes.
>
> What I notice is that some workers degrade in performance and keep
> accumulating a queued files delay. See attached 

Re: [E] Lagging worker nodes

2021-01-28 Thread Mark Payne
Zilvinas,

Also I figured I would throw out links to a few videos that I’ve posted on 
YouTube regarding NiFi Anti-Patterns, as they may be helpful for you.

Part 4: Scheduling / Concurrent Tasks / Thread Pools. Discusses how to 
determine the appropriate size of the Timer-Driven Thread Pool and concurrent 
tasks, and the pitfalls that you’ll encounter if not well sized. Also discusses 
some other aspects to look into if increasing threads is not helping 
performance. Seems very relevant to the conversation had with Joe Witt the 
other day, discussing number of concurrent tasks, etc.  
https://www.youtube.com/watch?v=pZq0EbfDBy4

Part 3: Load balanced connections. Discusses when to use and when not to, how 
to configure properly. Very much related to this conversation. 
https://www.youtube.com/watch?v=by9P0Zi8Dk8

Part 2: Poor flow layout. This is mostly a walkthrough of how to layout the 
components cleanly and shows some of the tips & tricks to help line things up 
cleanly, document flow, etc. https://www.youtube.com/watch?v=v1CoQk730qs

Part 1: Avoid splitting & re-merging data, treating FlowFile content as 
Attributes, and Parsing structured/semi-structured textual data as unstructured 
data. I’ve not seen these in your flow, but helpful to know:  
https://www.youtube.com/watch?v=RjWstt7nRVY

Hope these are helpful!

Thanks
-Mark

On Jan 28, 2021, at 3:57 PM, Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> 
wrote:

Thanks Mark. I see what you're saying. I'll try out your ideas tomorrow and see 
how it goes. Thanks a lot!

On Thu, Jan 28, 2021 at 8:55 PM Mark Payne 
mailto:marka...@hotmail.com>> wrote:
… And now that I’m reading more about how your flow is setup (I didn’t see that 
the source of the data was SQS at first)… I would not recommend using 
load-balanced connections at all.

Instead, you should be able to rely solely on backpressure to allow for evenly 
distributing the data. If you configure backpressure appropriately, you will 
really only have the amount of data on each node that it can work on at once 
(plus maybe a little extra to avoid higher latency).

So if PublishKafka is your bottleneck and has 4 threads, the connection coming 
into PublishKafka should have a max queue size of maybe 8 FlowFiles. This will 
cause backpressure to flow back up the chain, and each queue should have very 
small backpressure limits. As a result, the nodes will only pull the SQS 
messages are they are able to handle it. You really only want to pull the data 
as you’re able to process it.

On Jan 28, 2021, at 3:48 PM, Mark Payne 
mailto:marka...@hotmail.com>> wrote:

Zilvinas,

That is accurate - when a connection is load balanced, the data is pushed to a 
particular node based on the selected algorithm. It is not continually 
rebalanced.

So for a flow like this, my recommendation would be:

1) Set the backpressure threshold from FetchS3Object -> PublishKafka to 
something “small.” Probably 1 or 2x the number of concurrent tasks that you 
have for PublishKafka. That means that you’ll queue up little data there. That 
will cause the data to instead build up on the connection between 
EvaluateJsonPath -> FetchS3Object.

2) Set the backpressure threshold from EvaluateJsonPath -> FetchS3Object to 
something small also. Maybe 2x-4x the number of nodes in the cluster. This will 
result in data not queuing up here. As a result, as the size of this queue gets 
smaller on one node, the data will begin to flow in and get distributed to one 
of the nodes in the cluster.

By keeping these queues small, essentially this will cause the “load balanced” 
data to stay small. As the faster nodes work off their queue, it will allow 
more data to flow in and be load balanced. The nodes that are not performing 
well will still have backpressure applied so they won’t get much of the data as 
it flows in.

As your flow is right now, there’s no backpressure being hit in the 
load-balanced queue. As a result, data streams in as fast as it can and gets 
distributed across the cluster. And the nodes that can’t keep up already have a 
huge backlog of SQS messages. So making this adjustment will help to distribute 
the data as to the nodes as they become able to handle it.

Thanks
-Mark

On Jan 28, 2021, at 3:27 PM, Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> 
wrote:

My other issue is that the balancing is not rebalancing the queue? Perhaps I 
misunderstand how balancing should work and it only balances round robin new 
incoming files? I can easily manually rebalance by disabling balancing and 
enabling it again but after a while it gets back to the same situation where 
some nodes are getting worse and worse delayed more and more and some remain 
fine.

On Thu, Jan 28, 2021 at 8:22 PM Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> 
wrote:
Hi Joe,

Yes it is the same issue. We have used your advice and reduced the amount of 
threads on our large processors: fetch/compress/publish to a minimum 

Re: [E] Lagging worker nodes

2021-01-28 Thread Zilvinas Saltys
Thanks Mark. I see what you're saying. I'll try out your ideas tomorrow and
see how it goes. Thanks a lot!

On Thu, Jan 28, 2021 at 8:55 PM Mark Payne  wrote:

> … And now that I’m reading more about how your flow is setup (I didn’t see
> that the source of the data was SQS at first)… I would not recommend using
> load-balanced connections at all.
>
> Instead, you should be able to rely solely on backpressure to allow for
> evenly distributing the data. If you configure backpressure appropriately,
> you will really only have the amount of data on each node that it can work
> on at once (plus maybe a little extra to avoid higher latency).
>
> So if PublishKafka is your bottleneck and has 4 threads, the connection
> coming into PublishKafka should have a max queue size of maybe 8 FlowFiles.
> This will cause backpressure to flow back up the chain, and each queue
> should have very small backpressure limits. As a result, the nodes will
> only pull the SQS messages are they are able to handle it. You really only
> want to pull the data as you’re able to process it.
>
> On Jan 28, 2021, at 3:48 PM, Mark Payne  wrote:
>
> Zilvinas,
>
> That is accurate - when a connection is load balanced, the data is pushed
> to a particular node based on the selected algorithm. It is not continually
> rebalanced.
>
> So for a flow like this, my recommendation would be:
>
> 1) Set the backpressure threshold from FetchS3Object -> PublishKafka to
> something “small.” Probably 1 or 2x the number of concurrent tasks that you
> have for PublishKafka. That means that you’ll queue up little data there.
> That will cause the data to instead build up on the connection between
> EvaluateJsonPath -> FetchS3Object.
>
> 2) Set the backpressure threshold from EvaluateJsonPath -> FetchS3Object
> to something small also. Maybe 2x-4x the number of nodes in the cluster.
> This will result in data not queuing up here. As a result, as the size of
> this queue gets smaller on one node, the data will begin to flow in and get
> distributed to one of the nodes in the cluster.
>
> By keeping these queues small, essentially this will cause the “load
> balanced” data to stay small. As the faster nodes work off their queue, it
> will allow more data to flow in and be load balanced. The nodes that are
> not performing well will still have backpressure applied so they won’t get
> much of the data as it flows in.
>
> As your flow is right now, there’s no backpressure being hit in the
> load-balanced queue. As a result, data streams in as fast as it can and
> gets distributed across the cluster. And the nodes that can’t keep up
> already have a huge backlog of SQS messages. So making this adjustment will
> help to distribute the data as to the nodes as they become able to handle
> it.
>
> Thanks
> -Mark
>
> On Jan 28, 2021, at 3:27 PM, Zilvinas Saltys <
> zilvinas.sal...@verizonmedia.com> wrote:
>
> My other issue is that the balancing is not rebalancing the queue? Perhaps
> I misunderstand how balancing should work and it only balances round robin
> new incoming files? I can easily manually rebalance by disabling balancing
> and enabling it again but after a while it gets back to the same situation
> where some nodes are getting worse and worse delayed more and more and some
> remain fine.
>
> On Thu, Jan 28, 2021 at 8:22 PM Zilvinas Saltys <
> zilvinas.sal...@verizonmedia.com> wrote:
>
>> Hi Joe,
>>
>> Yes it is the same issue. We have used your advice and reduced the amount
>> of threads on our large processors: fetch/compress/publish to a minimum and
>> then increased gradually to 4 until the processing rate became acceptable
>> (about 2000 files per 5 min). This is a cluster of 25 nodes of 36 cores
>> each.
>>
>> On Thu, Jan 28, 2021 at 8:19 PM Joe Witt  wrote:
>>
>>> I'm assuming also this is the same thing Maksym was asking about
>>> yesterday.  Let's try to keep the thread together as this gets discussed.
>>>
>>> On Thu, Jan 28, 2021 at 1:10 PM Pierre Villard <
>>> pierre.villard...@gmail.com> wrote:
>>>
 Hi Zilvinas,

 I'm afraid we would need more details to help you out here.

 My first question by quickly looking at the graph would be: there is a
 host (green line) where the number of queued flow files is more or less
 constantly growing. Where in the flow are the flow files accumulating for
 this node? What processor is creating back pressure? Do we have anything in
 the log for this node around the time where flow files start accumulating?

 Thanks,
 Pierre

 Le ven. 29 janv. 2021 à 00:02, Zilvinas Saltys <
 zilvinas.sal...@verizonmedia.com> a écrit :

> Hi,
>
> We run a 25 node Nifi cluster on version 1.12. We're processing about
> 2000 files per 5 mins where each file is from 100 to 500 megabytes.
>
> What I notice is that some workers degrade in performance and keep
> accumulating a queued files delay. See attached screenshots where it shows
> two 

Re: [E] Lagging worker nodes

2021-01-28 Thread Mark Payne
… And now that I’m reading more about how your flow is setup (I didn’t see that 
the source of the data was SQS at first)… I would not recommend using 
load-balanced connections at all.

Instead, you should be able to rely solely on backpressure to allow for evenly 
distributing the data. If you configure backpressure appropriately, you will 
really only have the amount of data on each node that it can work on at once 
(plus maybe a little extra to avoid higher latency).

So if PublishKafka is your bottleneck and has 4 threads, the connection coming 
into PublishKafka should have a max queue size of maybe 8 FlowFiles. This will 
cause backpressure to flow back up the chain, and each queue should have very 
small backpressure limits. As a result, the nodes will only pull the SQS 
messages are they are able to handle it. You really only want to pull the data 
as you’re able to process it.

On Jan 28, 2021, at 3:48 PM, Mark Payne 
mailto:marka...@hotmail.com>> wrote:

Zilvinas,

That is accurate - when a connection is load balanced, the data is pushed to a 
particular node based on the selected algorithm. It is not continually 
rebalanced.

So for a flow like this, my recommendation would be:

1) Set the backpressure threshold from FetchS3Object -> PublishKafka to 
something “small.” Probably 1 or 2x the number of concurrent tasks that you 
have for PublishKafka. That means that you’ll queue up little data there. That 
will cause the data to instead build up on the connection between 
EvaluateJsonPath -> FetchS3Object.

2) Set the backpressure threshold from EvaluateJsonPath -> FetchS3Object to 
something small also. Maybe 2x-4x the number of nodes in the cluster. This will 
result in data not queuing up here. As a result, as the size of this queue gets 
smaller on one node, the data will begin to flow in and get distributed to one 
of the nodes in the cluster.

By keeping these queues small, essentially this will cause the “load balanced” 
data to stay small. As the faster nodes work off their queue, it will allow 
more data to flow in and be load balanced. The nodes that are not performing 
well will still have backpressure applied so they won’t get much of the data as 
it flows in.

As your flow is right now, there’s no backpressure being hit in the 
load-balanced queue. As a result, data streams in as fast as it can and gets 
distributed across the cluster. And the nodes that can’t keep up already have a 
huge backlog of SQS messages. So making this adjustment will help to distribute 
the data as to the nodes as they become able to handle it.

Thanks
-Mark

On Jan 28, 2021, at 3:27 PM, Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> 
wrote:

My other issue is that the balancing is not rebalancing the queue? Perhaps I 
misunderstand how balancing should work and it only balances round robin new 
incoming files? I can easily manually rebalance by disabling balancing and 
enabling it again but after a while it gets back to the same situation where 
some nodes are getting worse and worse delayed more and more and some remain 
fine.

On Thu, Jan 28, 2021 at 8:22 PM Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> 
wrote:
Hi Joe,

Yes it is the same issue. We have used your advice and reduced the amount of 
threads on our large processors: fetch/compress/publish to a minimum and then 
increased gradually to 4 until the processing rate became acceptable (about 
2000 files per 5 min). This is a cluster of 25 nodes of 36 cores each.

On Thu, Jan 28, 2021 at 8:19 PM Joe Witt 
mailto:joe.w...@gmail.com>> wrote:
I'm assuming also this is the same thing Maksym was asking about yesterday.  
Let's try to keep the thread together as this gets discussed.

On Thu, Jan 28, 2021 at 1:10 PM Pierre Villard 
mailto:pierre.villard...@gmail.com>> wrote:
Hi Zilvinas,

I'm afraid we would need more details to help you out here.

My first question by quickly looking at the graph would be: there is a host 
(green line) where the number of queued flow files is more or less constantly 
growing. Where in the flow are the flow files accumulating for this node? What 
processor is creating back pressure? Do we have anything in the log for this 
node around the time where flow files start accumulating?

Thanks,
Pierre

Le ven. 29 janv. 2021 à 00:02, Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> a 
écrit :
Hi,

We run a 25 node Nifi cluster on version 1.12. We're processing about 2000 
files per 5 mins where each file is from 100 to 500 megabytes.

What I notice is that some workers degrade in performance and keep accumulating 
a queued files delay. See attached screenshots where it shows two hosts where 
one is degraded.

One seemingly dead give away is that the degraded node starts doing heavy and 
intensive disk read io while the other node keeps doing none. I ran iostat on 
those nodes and I know that the read IOs are on the content_repository 
directory. But it makes no sense to me how some of the 

Re: [E] Lagging worker nodes

2021-01-28 Thread Mark Payne
Zilvinas,

That is accurate - when a connection is load balanced, the data is pushed to a 
particular node based on the selected algorithm. It is not continually 
rebalanced.

So for a flow like this, my recommendation would be:

1) Set the backpressure threshold from FetchS3Object -> PublishKafka to 
something “small.” Probably 1 or 2x the number of concurrent tasks that you 
have for PublishKafka. That means that you’ll queue up little data there. That 
will cause the data to instead build up on the connection between 
EvaluateJsonPath -> FetchS3Object.

2) Set the backpressure threshold from EvaluateJsonPath -> FetchS3Object to 
something small also. Maybe 2x-4x the number of nodes in the cluster. This will 
result in data not queuing up here. As a result, as the size of this queue gets 
smaller on one node, the data will begin to flow in and get distributed to one 
of the nodes in the cluster.

By keeping these queues small, essentially this will cause the “load balanced” 
data to stay small. As the faster nodes work off their queue, it will allow 
more data to flow in and be load balanced. The nodes that are not performing 
well will still have backpressure applied so they won’t get much of the data as 
it flows in.

As your flow is right now, there’s no backpressure being hit in the 
load-balanced queue. As a result, data streams in as fast as it can and gets 
distributed across the cluster. And the nodes that can’t keep up already have a 
huge backlog of SQS messages. So making this adjustment will help to distribute 
the data as to the nodes as they become able to handle it.

Thanks
-Mark

On Jan 28, 2021, at 3:27 PM, Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> 
wrote:

My other issue is that the balancing is not rebalancing the queue? Perhaps I 
misunderstand how balancing should work and it only balances round robin new 
incoming files? I can easily manually rebalance by disabling balancing and 
enabling it again but after a while it gets back to the same situation where 
some nodes are getting worse and worse delayed more and more and some remain 
fine.

On Thu, Jan 28, 2021 at 8:22 PM Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> 
wrote:
Hi Joe,

Yes it is the same issue. We have used your advice and reduced the amount of 
threads on our large processors: fetch/compress/publish to a minimum and then 
increased gradually to 4 until the processing rate became acceptable (about 
2000 files per 5 min). This is a cluster of 25 nodes of 36 cores each.

On Thu, Jan 28, 2021 at 8:19 PM Joe Witt 
mailto:joe.w...@gmail.com>> wrote:
I'm assuming also this is the same thing Maksym was asking about yesterday.  
Let's try to keep the thread together as this gets discussed.

On Thu, Jan 28, 2021 at 1:10 PM Pierre Villard 
mailto:pierre.villard...@gmail.com>> wrote:
Hi Zilvinas,

I'm afraid we would need more details to help you out here.

My first question by quickly looking at the graph would be: there is a host 
(green line) where the number of queued flow files is more or less constantly 
growing. Where in the flow are the flow files accumulating for this node? What 
processor is creating back pressure? Do we have anything in the log for this 
node around the time where flow files start accumulating?

Thanks,
Pierre

Le ven. 29 janv. 2021 à 00:02, Zilvinas Saltys 
mailto:zilvinas.sal...@verizonmedia.com>> a 
écrit :
Hi,

We run a 25 node Nifi cluster on version 1.12. We're processing about 2000 
files per 5 mins where each file is from 100 to 500 megabytes.

What I notice is that some workers degrade in performance and keep accumulating 
a queued files delay. See attached screenshots where it shows two hosts where 
one is degraded.

One seemingly dead give away is that the degraded node starts doing heavy and 
intensive disk read io while the other node keeps doing none. I ran iostat on 
those nodes and I know that the read IOs are on the content_repository 
directory. But it makes no sense to me how some of the nodes who are doing 
these heavy tasks are doing no disk read io. In this example I know that both 
nodes are processing roughly the same amount of files and of same size.

The pipeline is somewhat simple:
1) Read from SQS 2) Fetch file contents from S3 3) Publish file contents to 
Kafka 4) Compress file contents 5) Put compressed contents back to S3

All of these operations to my understanding should require heavy reads from 
local disk to fetch file contents from content repository? How is such a thing 
possible that some nodes are processing lots of files and are not showing any 
disk reads and then suddenly spike in disk reads and degrade?

Any clues would be really helpful.
Thanks.