dschneider-pivotal commented on a change in pull request #6764:
URL: https://github.com/apache/geode/pull/6764#discussion_r692547669



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -2251,48 +2237,54 @@ void checkQueueSizeConstraint() throws 
InterruptedException {
         if (Thread.interrupted()) {
           throw new InterruptedException();
         }
-        synchronized (this.putGuard) {
-          if (putPermits <= 0) {
-            synchronized (this.permitMon) {
-              if (reconcilePutPermits() <= 0) {
-                if 
(region.getSystem().getConfig().getRemoveUnresponsiveClient()) {
-                  isClientSlowReceiver = true;
-                } else {
-                  try {
-                    long logFrequency = 
CacheClientNotifier.DEFAULT_LOG_FREQUENCY;
-                    CacheClientNotifier ccn = 
CacheClientNotifier.getInstance();
-                    if (ccn != null) { // check needed for junit tests
-                      logFrequency = ccn.getLogFrequency();
-                    }
-                    if ((this.maxQueueSizeHitCount % logFrequency) == 0) {
-                      logger.warn("Client queue for {} client is full.",
-                          new Object[] {region.getName()});
-                      this.maxQueueSizeHitCount = 0;
-                    }
-                    ++this.maxQueueSizeHitCount;
-                    this.region.checkReadiness(); // fix for bug 37581
-                    // TODO: wait called while holding two locks
-                    
this.permitMon.wait(CacheClientNotifier.eventEnqueueWaitTime);
-                    this.region.checkReadiness(); // fix for bug 37581
-                    // Fix for #51400. Allow the queue to grow beyond its
-                    // capacity/maxQueueSize, if it is taking a long time to
-                    // drain the queue, either due to a slower client or the
-                    // deadlock scenario mentioned in the ticket.
-                    reconcilePutPermits();
-                    if ((this.maxQueueSizeHitCount % logFrequency) == 1) {
-                      logger.info("Resuming with processing puts ...");
-                    }
-                  } catch (InterruptedException ex) {
-                    // TODO: The line below is meaningless. Comment it out 
later
-                    this.permitMon.notifyAll();
-                    throw ex;
-                  }
-                }
-              }
-            } // synchronized (this.permitMon)
-          } // if (putPermits <= 0)
-          --putPermits;
-        } // synchronized (this.putGuard)
+        long startTime = System.currentTimeMillis();
+        if (putPermits.get() <= 0) {
+          synchronized (this.permitMon) {
+            long duration = (System.currentTimeMillis() - startTime);
+            checkQueueSizeConstraintCore(duration);
+          } // synchronized (this.permitMon)
+        } // if (putPermits <= 0)
+        putPermits.decrementAndGet();
+      }
+    }
+
+    /* Do not call this method directly from anywhere except 
checkQueueSizeConstraint */
+    private void checkQueueSizeConstraintCore(long duration) throws 
InterruptedException {
+      if (reconcilePutPermits() <= 0) {

Review comment:
       since (in my design) the caller has already decd putPermits, then we can 
stop waiting in this method if reconcilePutPermits() >= 0. So I think you want 
this comparison to be < 0.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -2318,7 +2310,7 @@ private int reconcilePutPermits() {
     @Override
     void incrementTakeSidePutPermitsWithoutNotify() {
       synchronized (this.permitMon) {

Review comment:
       look into if this sync is still needed. It was before because all writes 
of takeSidePutPermits did it while synced on permitMon. But now that you have 
made takeSidePutPermits atomic and this method does not want to notify it might 
mean that the sync is no longer needed. 

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -2251,48 +2237,54 @@ void checkQueueSizeConstraint() throws 
InterruptedException {
         if (Thread.interrupted()) {
           throw new InterruptedException();
         }
-        synchronized (this.putGuard) {
-          if (putPermits <= 0) {
-            synchronized (this.permitMon) {
-              if (reconcilePutPermits() <= 0) {
-                if 
(region.getSystem().getConfig().getRemoveUnresponsiveClient()) {
-                  isClientSlowReceiver = true;
-                } else {
-                  try {
-                    long logFrequency = 
CacheClientNotifier.DEFAULT_LOG_FREQUENCY;
-                    CacheClientNotifier ccn = 
CacheClientNotifier.getInstance();
-                    if (ccn != null) { // check needed for junit tests
-                      logFrequency = ccn.getLogFrequency();
-                    }
-                    if ((this.maxQueueSizeHitCount % logFrequency) == 0) {
-                      logger.warn("Client queue for {} client is full.",
-                          new Object[] {region.getName()});
-                      this.maxQueueSizeHitCount = 0;
-                    }
-                    ++this.maxQueueSizeHitCount;
-                    this.region.checkReadiness(); // fix for bug 37581
-                    // TODO: wait called while holding two locks
-                    
this.permitMon.wait(CacheClientNotifier.eventEnqueueWaitTime);
-                    this.region.checkReadiness(); // fix for bug 37581
-                    // Fix for #51400. Allow the queue to grow beyond its
-                    // capacity/maxQueueSize, if it is taking a long time to
-                    // drain the queue, either due to a slower client or the
-                    // deadlock scenario mentioned in the ticket.
-                    reconcilePutPermits();
-                    if ((this.maxQueueSizeHitCount % logFrequency) == 1) {
-                      logger.info("Resuming with processing puts ...");
-                    }
-                  } catch (InterruptedException ex) {
-                    // TODO: The line below is meaningless. Comment it out 
later
-                    this.permitMon.notifyAll();
-                    throw ex;
-                  }
-                }
-              }
-            } // synchronized (this.permitMon)
-          } // if (putPermits <= 0)
-          --putPermits;
-        } // synchronized (this.putGuard)
+        long startTime = System.currentTimeMillis();
+        if (putPermits.get() <= 0) {
+          synchronized (this.permitMon) {

Review comment:
       since checkQueueSizeConstraintCore does a wait on permitMon, I think it 
would be better for the sync on permitMon to also be in 
checkQueueSizeConstraintCore. I would also move all the time calls into 
checkQueueSizeConstraintCore. Maybe change its name to waitForPermission since 
we only call it when we run out of permits.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -2302,9 +2294,9 @@ void checkQueueSizeConstraint() throws 
InterruptedException {
      * @return int current Put permits
      */
     private int reconcilePutPermits() {
-      putPermits += takeSidePutPermits;
-      takeSidePutPermits = 0;
-      return putPermits;
+      putPermits.addAndGet(takeSidePutPermits.get());

Review comment:
       I think you have a concurrency bug between takeSidePutPermits.get() and 
the next line takeSidePutPermits.set(0).
   If some other thread incs takeSidePutPermits between these two statements 
then the inc is lost.
   I think you want to rewrite this code like so:
   
   int takes = takeSidePutPermits.getAndSet(0);
   return putPermits.addAndGet(takes);
   

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -2251,48 +2237,54 @@ void checkQueueSizeConstraint() throws 
InterruptedException {
         if (Thread.interrupted()) {
           throw new InterruptedException();
         }
-        synchronized (this.putGuard) {
-          if (putPermits <= 0) {
-            synchronized (this.permitMon) {
-              if (reconcilePutPermits() <= 0) {
-                if 
(region.getSystem().getConfig().getRemoveUnresponsiveClient()) {
-                  isClientSlowReceiver = true;
-                } else {
-                  try {
-                    long logFrequency = 
CacheClientNotifier.DEFAULT_LOG_FREQUENCY;
-                    CacheClientNotifier ccn = 
CacheClientNotifier.getInstance();
-                    if (ccn != null) { // check needed for junit tests
-                      logFrequency = ccn.getLogFrequency();
-                    }
-                    if ((this.maxQueueSizeHitCount % logFrequency) == 0) {
-                      logger.warn("Client queue for {} client is full.",
-                          new Object[] {region.getName()});
-                      this.maxQueueSizeHitCount = 0;
-                    }
-                    ++this.maxQueueSizeHitCount;
-                    this.region.checkReadiness(); // fix for bug 37581
-                    // TODO: wait called while holding two locks
-                    
this.permitMon.wait(CacheClientNotifier.eventEnqueueWaitTime);
-                    this.region.checkReadiness(); // fix for bug 37581
-                    // Fix for #51400. Allow the queue to grow beyond its
-                    // capacity/maxQueueSize, if it is taking a long time to
-                    // drain the queue, either due to a slower client or the
-                    // deadlock scenario mentioned in the ticket.
-                    reconcilePutPermits();
-                    if ((this.maxQueueSizeHitCount % logFrequency) == 1) {
-                      logger.info("Resuming with processing puts ...");
-                    }
-                  } catch (InterruptedException ex) {
-                    // TODO: The line below is meaningless. Comment it out 
later
-                    this.permitMon.notifyAll();
-                    throw ex;
-                  }
-                }
-              }
-            } // synchronized (this.permitMon)
-          } // if (putPermits <= 0)
-          --putPermits;
-        } // synchronized (this.putGuard)
+        long startTime = System.currentTimeMillis();
+        if (putPermits.get() <= 0) {

Review comment:
       instead of putPermits.get() here I think it should be 
putPermits.decrementAndGet(). You also need to get rid of the decrementAndGet 
call at the end (line 2247).
   Also the System.currentTimeMillis() call should be after the decrementAndGet 
check.
   You can just change this code to if (putPermits.decrementAndGet() >= 0) 
return; Note that if after the dec it is zero we are still good. We only need 
to wait if it drops below 0.
   So the idea is that it first dec the atomic and checks if the result is > 0. 
If it is its done. If not it then drops into your current if block that sync 
and calls checkQueueSizeConstraintCore. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to