mhansonp commented on a change in pull request #6764:
URL: https://github.com/apache/geode/pull/6764#discussion_r705613957
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
##########
@@ -2251,48 +2237,54 @@ void checkQueueSizeConstraint() throws
InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
- synchronized (this.putGuard) {
- if (putPermits <= 0) {
- synchronized (this.permitMon) {
- if (reconcilePutPermits() <= 0) {
- if
(region.getSystem().getConfig().getRemoveUnresponsiveClient()) {
- isClientSlowReceiver = true;
- } else {
- try {
- long logFrequency =
CacheClientNotifier.DEFAULT_LOG_FREQUENCY;
- CacheClientNotifier ccn =
CacheClientNotifier.getInstance();
- if (ccn != null) { // check needed for junit tests
- logFrequency = ccn.getLogFrequency();
- }
- if ((this.maxQueueSizeHitCount % logFrequency) == 0) {
- logger.warn("Client queue for {} client is full.",
- new Object[] {region.getName()});
- this.maxQueueSizeHitCount = 0;
- }
- ++this.maxQueueSizeHitCount;
- this.region.checkReadiness(); // fix for bug 37581
- // TODO: wait called while holding two locks
-
this.permitMon.wait(CacheClientNotifier.eventEnqueueWaitTime);
- this.region.checkReadiness(); // fix for bug 37581
- // Fix for #51400. Allow the queue to grow beyond its
- // capacity/maxQueueSize, if it is taking a long time to
- // drain the queue, either due to a slower client or the
- // deadlock scenario mentioned in the ticket.
- reconcilePutPermits();
- if ((this.maxQueueSizeHitCount % logFrequency) == 1) {
- logger.info("Resuming with processing puts ...");
- }
- } catch (InterruptedException ex) {
- // TODO: The line below is meaningless. Comment it out
later
- this.permitMon.notifyAll();
- throw ex;
- }
- }
- }
- } // synchronized (this.permitMon)
- } // if (putPermits <= 0)
- --putPermits;
- } // synchronized (this.putGuard)
+ long startTime = System.currentTimeMillis();
+ if (putPermits.get() <= 0) {
+ synchronized (this.permitMon) {
+ long duration = (System.currentTimeMillis() - startTime);
+ checkQueueSizeConstraintCore(duration);
+ } // synchronized (this.permitMon)
+ } // if (putPermits <= 0)
+ putPermits.decrementAndGet();
+ }
+ }
+
+ /* Do not call this method directly from anywhere except
checkQueueSizeConstraint */
+ private void checkQueueSizeConstraintCore(long duration) throws
InterruptedException {
+ if (reconcilePutPermits() <= 0) {
Review comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]