This is an automated email from the ASF dual-hosted git repository.

alopresto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 57c7883  NIFI-7566: Avoid using Thread.sleep() to wait for 
Site-to-Site connection to be handled. Instead, use TimeUnit.timedWait and use 
Object.notifyAll when setting the beingServiced flag. This significantly 
reduces latency and improves throughput for small-batch site-to-site 
communications
57c7883 is described below

commit 57c7883f647348aba181440950f01cf1d62846c6
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Fri Jun 19 13:19:17 2020 -0400

    NIFI-7566: Avoid using Thread.sleep() to wait for Site-to-Site connection 
to be handled. Instead, use TimeUnit.timedWait and use Object.notifyAll when 
setting the beingServiced flag. This significantly reduces latency and improves 
throughput for small-batch site-to-site communications
    
    This closes #4353.
    
    Signed-off-by: Andy LoPresto <alopre...@apache.org>
---
 .../org/apache/nifi/remote/StandardPublicPort.java | 62 ++++++++++++++--------
 1 file changed, 41 insertions(+), 21 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java
index 66cbc05..d1ee69f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardPublicPort.java
@@ -454,11 +454,31 @@ public class StandardPublicPort extends AbstractPort 
implements PublicPort {
         }
 
         public void setServiceBegin() {
-            this.beingServiced.set(true);
+            beingServiced.set(true);
+            synchronized (this) {
+                notifyAll();
+            }
         }
 
-        public boolean isBeingServiced() {
-            return beingServiced.get();
+        public boolean waitForService(final long duration, final TimeUnit 
timeUnit) throws InterruptedException {
+            final long latestWaitTime = System.nanoTime() + 
timeUnit.toNanos(duration);
+
+            while (!beingServiced.get()) {
+                final long nanosToWait = latestWaitTime - System.nanoTime();
+                if (nanosToWait <= 0) {
+                    return false;
+                }
+
+                if (isExpired()) {
+                    return false;
+                }
+
+                synchronized (this) {
+                    TimeUnit.NANOSECONDS.timedWait(this, nanosToWait);
+                }
+            }
+
+            return true;
         }
 
         public BlockingQueue<ProcessingResult> getResponseQueue() {
@@ -515,17 +535,17 @@ public class StandardPublicPort extends AbstractPort 
implements PublicPort {
 
             // wait for the request to start getting serviced... and time out 
if it doesn't happen
             // before the request expires
-            while (!request.isBeingServiced()) {
-                if (request.isExpired()) {
-                    // Remove expired request, so that it won't block new 
request to be offered.
-                    this.requestQueue.remove(request);
-                    throw new SocketTimeoutException("Read timed out");
-                } else {
-                    try {
-                        Thread.sleep(100L);
-                    } catch (final InterruptedException e) {
+            try {
+                while (!request.waitForService(100, TimeUnit.MILLISECONDS)) {
+                    if (request.isExpired()) {
+                        // Remove expired request, so that it won't block new 
request to be offered.
+                        this.requestQueue.remove(request);
+                        throw new SocketTimeoutException("Read timed out");
                     }
                 }
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new ProcessException("Interrupted while waiting for 
site-to-site request to be serviced", ie);
             }
 
             // we've started to service the request. Now just wait until it's 
finished
@@ -571,17 +591,17 @@ public class StandardPublicPort extends AbstractPort 
implements PublicPort {
 
             // wait for the request to start getting serviced... and time out 
if it doesn't happen
             // before the request expires
-            while (!request.isBeingServiced()) {
-                if (request.isExpired()) {
-                    // Remove expired request, so that it won't block new 
request to be offered.
-                    this.requestQueue.remove(request);
-                    throw new SocketTimeoutException("Read timed out");
-                } else {
-                    try {
-                        Thread.sleep(100L);
-                    } catch (final InterruptedException e) {
+            try {
+                while (!request.waitForService(100, TimeUnit.MILLISECONDS)) {
+                    if (request.isExpired()) {
+                        // Remove expired request, so that it won't block new 
request to be offered.
+                        this.requestQueue.remove(request);
+                        throw new SocketTimeoutException("Read timed out");
                     }
                 }
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new ProcessException("Interrupted while waiting for 
Site-to-Site request to be serviced", ie);
             }
 
             // we've started to service the request. Now just wait until it's 
finished

Reply via email to