Repository: nifi
Updated Branches:
  refs/heads/master c7ff2fc5d -> 234ddb0fe


NIFI-5745: When determining if backpressure should be applied across nodes for 
load balancing, only consider if the local partition has reached the threshold 
limits instead of considering the size of the entire queue

This closes #3108.

Signed-off-by: Koji Kawamura <ijokaruma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/234ddb0f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/234ddb0f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/234ddb0f

Branch: refs/heads/master
Commit: 234ddb0fe1a36ad947c340114058d82c777d791f
Parents: c7ff2fc
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Oct 24 13:06:57 2018 -0400
Committer: Koji Kawamura <ijokaruma...@apache.org>
Committed: Fri Oct 26 16:04:46 2018 +0900

----------------------------------------------------------------------
 .../nifi/controller/queue/LoadBalancedFlowFileQueue.java       | 6 ++++++
 .../apache/nifi/controller/queue/AbstractFlowFileQueue.java    | 5 ++++-
 .../queue/clustered/SocketLoadBalancedFlowFileQueue.java       | 5 +++++
 .../queue/clustered/server/StandardLoadBalanceProtocol.java    | 4 +++-
 4 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/234ddb0f/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java
index f0eff27..b9f6951 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/LoadBalancedFlowFileQueue.java
@@ -66,4 +66,10 @@ public interface LoadBalancedFlowFileQueue extends 
FlowFileQueue {
      */
     boolean isPropagateBackpressureAcrossNodes();
 
+    /**
+     * Determines whether or not the local partition's size >= backpressure 
threshold
+     *
+     * @return <code>true</code> if the number of FlowFiles or total size of 
FlowFiles in the local partition alone meets or exceeds the backpressure 
threshold, <code>false</code> otherwise.
+     */
+    boolean isLocalPartitionFull();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/234ddb0f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
index 5bf75a4..436b85d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java
@@ -141,6 +141,10 @@ public abstract class AbstractFlowFileQueue implements 
FlowFileQueue {
 
     @Override
     public boolean isFull() {
+        return isFull(size());
+    }
+
+    protected boolean isFull(final QueueSize queueSize) {
         final MaxQueueSize maxSize = getMaxQueueSize();
 
         // Check if max size is set
@@ -148,7 +152,6 @@ public abstract class AbstractFlowFileQueue implements 
FlowFileQueue {
             return false;
         }
 
-        final QueueSize queueSize = size();
         if (maxSize.getMaxCount() > 0 && queueSize.getObjectCount() >= 
maxSize.getMaxCount()) {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/234ddb0f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 7b3a211..84731f7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -601,6 +601,11 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
         adjustSize(-flowFiles.size(), 
-flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
     }
 
+    @Override
+    public boolean isLocalPartitionFull() {
+        return isFull(localPartition.size());
+    }
+
     /**
      * Determines which QueuePartition the given FlowFile belongs to. Must be 
called with partition read lock held.
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/234ddb0f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
index dda71de..f508d12 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java
@@ -248,13 +248,15 @@ public class StandardLoadBalanceProtocol implements 
LoadBalanceProtocol {
                     "not configured to allow for Load Balancing");
         }
 
+        final LoadBalancedFlowFileQueue loadBalancedFlowFileQueue = 
(LoadBalancedFlowFileQueue) flowFileQueue;
+
         final int spaceCheck = dataIn.read();
         if (spaceCheck < 0) {
             throw new EOFException("Expected to receive a request to determine 
whether or not space was available for Connection with ID " + connectionId + " 
from Peer " + peerDescription);
         }
 
         if (spaceCheck == CHECK_SPACE) {
-            if (flowFileQueue.isFull()) {
+            if (loadBalancedFlowFileQueue.isLocalPartitionFull()) {
                 logger.debug("Received a 'Check Space' request from Peer {} 
for Connection with ID {}; responding with QUEUE_FULL", peerDescription, 
connectionId);
                 out.write(QUEUE_FULL);
                 out.flush();

Reply via email to