[geode] 01/01: GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals
This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-4624 in repository https://gitbox.apache.org/repos/asf/geode.git commit 3cf97755a8cda26d90d563a82b196639805cdc37 Author: zhouxh AuthorDate: Wed Mar 21 23:20:27 2018 -0700 GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals --- .../asyncqueue/internal/AsyncEventQueueStats.java | 5 ++ .../internal/cache/wan/AbstractGatewaySender.java | 15 +- .../wan/AbstractGatewaySenderEventProcessor.java | 62 -- .../internal/cache/wan/GatewaySenderStats.java | 16 ++ .../ConcurrentParallelGatewaySenderQueue.java | 9 .../wan/parallel/ParallelGatewaySenderQueue.java | 18 ++- .../geode/internal/cache/wan/WANTestBase.java | 41 +- .../ParallelGatewaySenderOperationsDUnitTest.java | 38 ++--- 8 files changed, 179 insertions(+), 25 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java index dee2c92..b8259a3 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java @@ -47,6 +47,9 @@ public class AsyncEventQueueStats extends GatewaySenderStats { f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED, "Number of events received but not added to the event queue because the queue already contains an event with the event's key.", "operations"), +f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"), +f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER, +"Number of events not added to primary queue due to sender yet runing.", "events"), f.createIntCounter(EVENTS_CONFLATED_FROM_BATCHES, "Number of events conflated from batches.", "operations"), f.createIntCounter(EVENTS_DISTRIBUTED, @@ -122,6 +125,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats { unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE); conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE); notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS); +notQueuedEventsAtYetRunningPrimarySenderId = +type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER); eventsFilteredId = type.nameToId(EVENTS_FILTERED); eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES); loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index a134e1e..76c1e24 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // If this gateway is not running, return if (!isRunning()) { if (isDebugEnabled) { -logger.debug("Returning back without putting into the gateway sender queue"); +logger.debug("Returning back without putting into the gateway sender queue" + event); + } + if (this.eventProcessor != null) { +this.eventProcessor.registerEventDroppedInPrimaryQueue(event); } return; } @@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // The sender may have stopped, after we have checked the status in the beginning. if (!isRunning()) { if (isDebugEnabled) { -logger.debug("Returning back without putting into the gateway sender queue"); +logger.debug("Returning back without putting into the gateway sender queue" + event); + } + if (this.eventProcessor != null) { +this.eventProcessor.registerEventDroppedInPrimaryQueue(event); } return; } @@ -1251,6 +1257,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return localProcessor == null ? 0 : localProcessor.eventQueueSize(); } + public int getEventSecondaryQueueSize() { +AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor; +return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize(); + } + public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) { this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents; } d
[geode] 01/01: GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals
This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-4624 in repository https://gitbox.apache.org/repos/asf/geode.git commit 219543ebe2150efd9fa9e3c7925de8cb0a584931 Author: zhouxh AuthorDate: Wed Mar 21 23:20:27 2018 -0700 GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals --- .../asyncqueue/internal/AsyncEventQueueStats.java | 5 ++ .../internal/cache/wan/AbstractGatewaySender.java | 15 +- .../wan/AbstractGatewaySenderEventProcessor.java | 62 -- .../internal/cache/wan/GatewaySenderStats.java | 16 ++ .../ConcurrentParallelGatewaySenderQueue.java | 9 .../wan/parallel/ParallelGatewaySenderQueue.java | 18 ++- .../geode/internal/cache/wan/WANTestBase.java | 41 +- .../ParallelGatewaySenderOperationsDUnitTest.java | 62 +++--- 8 files changed, 191 insertions(+), 37 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java index dee2c92..b8259a3 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java @@ -47,6 +47,9 @@ public class AsyncEventQueueStats extends GatewaySenderStats { f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED, "Number of events received but not added to the event queue because the queue already contains an event with the event's key.", "operations"), +f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"), +f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER, +"Number of events not added to primary queue due to sender yet runing.", "events"), f.createIntCounter(EVENTS_CONFLATED_FROM_BATCHES, "Number of events conflated from batches.", "operations"), f.createIntCounter(EVENTS_DISTRIBUTED, @@ -122,6 +125,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats { unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE); conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE); notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS); +notQueuedEventsAtYetRunningPrimarySenderId = +type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER); eventsFilteredId = type.nameToId(EVENTS_FILTERED); eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES); loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index a134e1e..76c1e24 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // If this gateway is not running, return if (!isRunning()) { if (isDebugEnabled) { -logger.debug("Returning back without putting into the gateway sender queue"); +logger.debug("Returning back without putting into the gateway sender queue" + event); + } + if (this.eventProcessor != null) { +this.eventProcessor.registerEventDroppedInPrimaryQueue(event); } return; } @@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // The sender may have stopped, after we have checked the status in the beginning. if (!isRunning()) { if (isDebugEnabled) { -logger.debug("Returning back without putting into the gateway sender queue"); +logger.debug("Returning back without putting into the gateway sender queue" + event); + } + if (this.eventProcessor != null) { +this.eventProcessor.registerEventDroppedInPrimaryQueue(event); } return; } @@ -1251,6 +1257,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return localProcessor == null ? 0 : localProcessor.eventQueueSize(); } + public int getEventSecondaryQueueSize() { +AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor; +return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize(); + } + public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) { this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvent
[geode] 01/01: GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals
This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-4624 in repository https://gitbox.apache.org/repos/asf/geode.git commit c08ebf1edd217734633656262cea0b2503ab74de Author: zhouxh AuthorDate: Wed Mar 21 23:20:27 2018 -0700 GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals --- .../internal/cache/wan/AbstractGatewaySender.java | 15 +- .../wan/AbstractGatewaySenderEventProcessor.java | 61 -- .../ConcurrentParallelGatewaySenderQueue.java | 9 .../wan/parallel/ParallelGatewaySenderQueue.java | 18 ++- .../geode/internal/cache/wan/WANTestBase.java | 26 - .../ParallelGatewaySenderOperationsDUnitTest.java | 24 - 6 files changed, 122 insertions(+), 31 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index a134e1e..76c1e24 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // If this gateway is not running, return if (!isRunning()) { if (isDebugEnabled) { -logger.debug("Returning back without putting into the gateway sender queue"); +logger.debug("Returning back without putting into the gateway sender queue" + event); + } + if (this.eventProcessor != null) { +this.eventProcessor.registerEventDroppedInPrimaryQueue(event); } return; } @@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // The sender may have stopped, after we have checked the status in the beginning. if (!isRunning()) { if (isDebugEnabled) { -logger.debug("Returning back without putting into the gateway sender queue"); +logger.debug("Returning back without putting into the gateway sender queue" + event); + } + if (this.eventProcessor != null) { +this.eventProcessor.registerEventDroppedInPrimaryQueue(event); } return; } @@ -1251,6 +1257,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return localProcessor == null ? 0 : localProcessor.eventQueueSize(); } + public int getEventSecondaryQueueSize() { +AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor; +return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize(); + } + public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) { this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 9309e43..7524203 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -31,25 +31,12 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; import org.apache.geode.GemFireException; import org.apache.geode.SystemFailure; -import org.apache.geode.cache.CacheException; -import org.apache.geode.cache.EntryEvent; -import org.apache.geode.cache.Operation; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionDestroyedException; +import org.apache.geode.cache.*; import org.apache.geode.cache.wan.GatewayEventFilter; import org.apache.geode.cache.wan.GatewayQueueEvent; import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.distributed.internal.DistributionConfig; -import org.apache.geode.internal.cache.BucketRegion; -import org.apache.geode.internal.cache.Conflatable; -import org.apache.geode.internal.cache.DistributedRegion; -import org.apache.geode.internal.cache.EntryEventImpl; -import org.apache.geode.internal.cache.EnumListenerEvent; -import org.apache.geode.internal.cache.EventID; -import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.internal.cache.LocalRegion; -import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.RegionQueue; +import org.apache.geode.internal.cache.*; import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderQueue; @@ -270,6 +257
[geode] 01/01: GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals
This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-4624 in repository https://gitbox.apache.org/repos/asf/geode.git commit f10946aeb283a405ccebc6c5c733c58590f6cadc Author: zhouxh AuthorDate: Wed Mar 21 23:20:27 2018 -0700 GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals --- .../internal/cache/wan/AbstractGatewaySender.java | 15 +++- .../wan/AbstractGatewaySenderEventProcessor.java | 44 ++ .../ConcurrentParallelGatewaySenderQueue.java | 9 + .../wan/parallel/ParallelGatewaySenderQueue.java | 18 - .../geode/internal/cache/wan/WANTestBase.java | 26 - .../ParallelGatewaySenderOperationsDUnitTest.java | 24 ++-- 6 files changed, 120 insertions(+), 16 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index a134e1e..76c1e24 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // If this gateway is not running, return if (!isRunning()) { if (isDebugEnabled) { -logger.debug("Returning back without putting into the gateway sender queue"); +logger.debug("Returning back without putting into the gateway sender queue" + event); + } + if (this.eventProcessor != null) { +this.eventProcessor.registerEventDroppedInPrimaryQueue(event); } return; } @@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi // The sender may have stopped, after we have checked the status in the beginning. if (!isRunning()) { if (isDebugEnabled) { -logger.debug("Returning back without putting into the gateway sender queue"); +logger.debug("Returning back without putting into the gateway sender queue" + event); + } + if (this.eventProcessor != null) { +this.eventProcessor.registerEventDroppedInPrimaryQueue(event); } return; } @@ -1251,6 +1257,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return localProcessor == null ? 0 : localProcessor.eventQueueSize(); } + public int getEventSecondaryQueueSize() { +AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor; +return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize(); + } + public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) { this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 9309e43..badafb0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -270,6 +270,50 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread { return this.queue.size(); } + public int eventSecondaryQueueSize() { +if (queue == null) { + return 0; +} + +// if parallel, get both primary and secondary queues' size, then substract primary queue's size +if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) { + int size = ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true) + - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false); + return size; +} +return this.queue.size(); + } + + public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) { +if (queue == null) { + return; +} +if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) { + ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue; + PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath()); + if (prQ == null) { +if (logger.isDebugEnabled()) { + logger.debug("shadow partitioned region " + event.getRegion().getFullPath() + + " is not created yet."); +} +return; + } + int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) event); + long shadowKey = event.getTailKey(); + + ParallelGatewaySenderQueue pgsq = + (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId); + boolean isPrimary = prQ.getRegio