[geode] 01/01: GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals

2018-04-09 Thread zhouxj
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-4624
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 3cf97755a8cda26d90d563a82b196639805cdc37
Author: zhouxh 
AuthorDate: Wed Mar 21 23:20:27 2018 -0700

GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the 
processing of queueRemovals
---
 .../asyncqueue/internal/AsyncEventQueueStats.java  |  5 ++
 .../internal/cache/wan/AbstractGatewaySender.java  | 15 +-
 .../wan/AbstractGatewaySenderEventProcessor.java   | 62 --
 .../internal/cache/wan/GatewaySenderStats.java | 16 ++
 .../ConcurrentParallelGatewaySenderQueue.java  |  9 
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 18 ++-
 .../geode/internal/cache/wan/WANTestBase.java  | 41 +-
 .../ParallelGatewaySenderOperationsDUnitTest.java  | 38 ++---
 8 files changed, 179 insertions(+), 25 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index dee2c92..b8259a3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -47,6 +47,9 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
 f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
 "Number of events received but not added to the event queue 
because the queue already contains an event with the event's key.",
 "operations"),
+f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added 
to queue.", "events"),
+f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER,
+"Number of events not added to primary queue due to sender yet 
runing.", "events"),
 f.createIntCounter(EVENTS_CONFLATED_FROM_BATCHES,
 "Number of events conflated from batches.", "operations"),
 f.createIntCounter(EVENTS_DISTRIBUTED,
@@ -122,6 +125,8 @@ public class AsyncEventQueueStats extends 
GatewaySenderStats {
 unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE);
 conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE);
 notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
+notQueuedEventsAtYetRunningPrimarySenderId =
+type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER);
 eventsFilteredId = type.nameToId(EVENTS_FILTERED);
 eventsConflatedFromBatchesId = 
type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
 loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index a134e1e..76c1e24 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 // If this gateway is not running, return
 if (!isRunning()) {
   if (isDebugEnabled) {
-logger.debug("Returning back without putting into the gateway sender 
queue");
+logger.debug("Returning back without putting into the gateway sender 
queue" + event);
+  }
+  if (this.eventProcessor != null) {
+this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
   }
   return;
 }
@@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 // The sender may have stopped, after we have checked the status in 
the beginning.
 if (!isRunning()) {
   if (isDebugEnabled) {
-logger.debug("Returning back without putting into the gateway 
sender queue");
+logger.debug("Returning back without putting into the gateway 
sender queue" + event);
+  }
+  if (this.eventProcessor != null) {
+this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
   }
   return;
 }
@@ -1251,6 +1257,11 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 return localProcessor == null ? 0 : localProcessor.eventQueueSize();
   }
 
+  public int getEventSecondaryQueueSize() {
+AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
+return localProcessor == null ? 0 : 
localProcessor.eventSecondaryQueueSize();
+  }
+
   public void setEnqueuedAllTempQueueEvents(boolean 
enqueuedAllTempQueueEvents) {
 this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents;
   }
d

[geode] 01/01: GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals

2018-04-09 Thread zhouxj
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-4624
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 219543ebe2150efd9fa9e3c7925de8cb0a584931
Author: zhouxh 
AuthorDate: Wed Mar 21 23:20:27 2018 -0700

GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the 
processing of queueRemovals
---
 .../asyncqueue/internal/AsyncEventQueueStats.java  |  5 ++
 .../internal/cache/wan/AbstractGatewaySender.java  | 15 +-
 .../wan/AbstractGatewaySenderEventProcessor.java   | 62 --
 .../internal/cache/wan/GatewaySenderStats.java | 16 ++
 .../ConcurrentParallelGatewaySenderQueue.java  |  9 
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 18 ++-
 .../geode/internal/cache/wan/WANTestBase.java  | 41 +-
 .../ParallelGatewaySenderOperationsDUnitTest.java  | 62 +++---
 8 files changed, 191 insertions(+), 37 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index dee2c92..b8259a3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -47,6 +47,9 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
 f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
 "Number of events received but not added to the event queue 
because the queue already contains an event with the event's key.",
 "operations"),
+f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added 
to queue.", "events"),
+f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER,
+"Number of events not added to primary queue due to sender yet 
runing.", "events"),
 f.createIntCounter(EVENTS_CONFLATED_FROM_BATCHES,
 "Number of events conflated from batches.", "operations"),
 f.createIntCounter(EVENTS_DISTRIBUTED,
@@ -122,6 +125,8 @@ public class AsyncEventQueueStats extends 
GatewaySenderStats {
 unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE);
 conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE);
 notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
+notQueuedEventsAtYetRunningPrimarySenderId =
+type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER);
 eventsFilteredId = type.nameToId(EVENTS_FILTERED);
 eventsConflatedFromBatchesId = 
type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
 loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index a134e1e..76c1e24 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 // If this gateway is not running, return
 if (!isRunning()) {
   if (isDebugEnabled) {
-logger.debug("Returning back without putting into the gateway sender 
queue");
+logger.debug("Returning back without putting into the gateway sender 
queue" + event);
+  }
+  if (this.eventProcessor != null) {
+this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
   }
   return;
 }
@@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 // The sender may have stopped, after we have checked the status in 
the beginning.
 if (!isRunning()) {
   if (isDebugEnabled) {
-logger.debug("Returning back without putting into the gateway 
sender queue");
+logger.debug("Returning back without putting into the gateway 
sender queue" + event);
+  }
+  if (this.eventProcessor != null) {
+this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
   }
   return;
 }
@@ -1251,6 +1257,11 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 return localProcessor == null ? 0 : localProcessor.eventQueueSize();
   }
 
+  public int getEventSecondaryQueueSize() {
+AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
+return localProcessor == null ? 0 : 
localProcessor.eventSecondaryQueueSize();
+  }
+
   public void setEnqueuedAllTempQueueEvents(boolean 
enqueuedAllTempQueueEvents) {
 this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvent

[geode] 01/01: GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals

2018-04-05 Thread zhouxj
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-4624
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c08ebf1edd217734633656262cea0b2503ab74de
Author: zhouxh 
AuthorDate: Wed Mar 21 23:20:27 2018 -0700

GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the 
processing of queueRemovals
---
 .../internal/cache/wan/AbstractGatewaySender.java  | 15 +-
 .../wan/AbstractGatewaySenderEventProcessor.java   | 61 --
 .../ConcurrentParallelGatewaySenderQueue.java  |  9 
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 18 ++-
 .../geode/internal/cache/wan/WANTestBase.java  | 26 -
 .../ParallelGatewaySenderOperationsDUnitTest.java  | 24 -
 6 files changed, 122 insertions(+), 31 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index a134e1e..76c1e24 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 // If this gateway is not running, return
 if (!isRunning()) {
   if (isDebugEnabled) {
-logger.debug("Returning back without putting into the gateway sender 
queue");
+logger.debug("Returning back without putting into the gateway sender 
queue" + event);
+  }
+  if (this.eventProcessor != null) {
+this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
   }
   return;
 }
@@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 // The sender may have stopped, after we have checked the status in 
the beginning.
 if (!isRunning()) {
   if (isDebugEnabled) {
-logger.debug("Returning back without putting into the gateway 
sender queue");
+logger.debug("Returning back without putting into the gateway 
sender queue" + event);
+  }
+  if (this.eventProcessor != null) {
+this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
   }
   return;
 }
@@ -1251,6 +1257,11 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 return localProcessor == null ? 0 : localProcessor.eventQueueSize();
   }
 
+  public int getEventSecondaryQueueSize() {
+AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
+return localProcessor == null ? 0 : 
localProcessor.eventSecondaryQueueSize();
+  }
+
   public void setEnqueuedAllTempQueueEvents(boolean 
enqueuedAllTempQueueEvents) {
 this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 9309e43..7524203 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -31,25 +31,12 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.CancelException;
 import org.apache.geode.GemFireException;
 import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.CacheException;
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.Operation;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.*;
 import org.apache.geode.cache.wan.GatewayEventFilter;
 import org.apache.geode.cache.wan.GatewayQueueEvent;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.internal.cache.BucketRegion;
-import org.apache.geode.internal.cache.Conflatable;
-import org.apache.geode.internal.cache.DistributedRegion;
-import org.apache.geode.internal.cache.EntryEventImpl;
-import org.apache.geode.internal.cache.EnumListenerEvent;
-import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.*;
 import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
 import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderQueue;
@@ -270,6 +257

[geode] 01/01: GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the processing of queueRemovals

2018-04-05 Thread zhouxj
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-4624
in repository https://gitbox.apache.org/repos/asf/geode.git

commit f10946aeb283a405ccebc6c5c733c58590f6cadc
Author: zhouxh 
AuthorDate: Wed Mar 21 23:20:27 2018 -0700

GEODE-4624: Add a new stat for AyncEventQueue/GatewaySender to track the 
processing of queueRemovals
---
 .../internal/cache/wan/AbstractGatewaySender.java  | 15 +++-
 .../wan/AbstractGatewaySenderEventProcessor.java   | 44 ++
 .../ConcurrentParallelGatewaySenderQueue.java  |  9 +
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 18 -
 .../geode/internal/cache/wan/WANTestBase.java  | 26 -
 .../ParallelGatewaySenderOperationsDUnitTest.java  | 24 ++--
 6 files changed, 120 insertions(+), 16 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index a134e1e..76c1e24 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 // If this gateway is not running, return
 if (!isRunning()) {
   if (isDebugEnabled) {
-logger.debug("Returning back without putting into the gateway sender 
queue");
+logger.debug("Returning back without putting into the gateway sender 
queue" + event);
+  }
+  if (this.eventProcessor != null) {
+this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
   }
   return;
 }
@@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 // The sender may have stopped, after we have checked the status in 
the beginning.
 if (!isRunning()) {
   if (isDebugEnabled) {
-logger.debug("Returning back without putting into the gateway 
sender queue");
+logger.debug("Returning back without putting into the gateway 
sender queue" + event);
+  }
+  if (this.eventProcessor != null) {
+this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
   }
   return;
 }
@@ -1251,6 +1257,11 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
 return localProcessor == null ? 0 : localProcessor.eventQueueSize();
   }
 
+  public int getEventSecondaryQueueSize() {
+AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
+return localProcessor == null ? 0 : 
localProcessor.eventSecondaryQueueSize();
+  }
+
   public void setEnqueuedAllTempQueueEvents(boolean 
enqueuedAllTempQueueEvents) {
 this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 9309e43..badafb0 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -270,6 +270,50 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends Thread {
 return this.queue.size();
   }
 
+  public int eventSecondaryQueueSize() {
+if (queue == null) {
+  return 0;
+}
+
+// if parallel, get both primary and secondary queues' size, then 
substract primary queue's size
+if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
+  int size = ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true)
+  - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false);
+  return size;
+}
+return this.queue.size();
+  }
+
+  public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) {
+if (queue == null) {
+  return;
+}
+if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
+  ConcurrentParallelGatewaySenderQueue cpgsq = 
(ConcurrentParallelGatewaySenderQueue) queue;
+  PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath());
+  if (prQ == null) {
+if (logger.isDebugEnabled()) {
+  logger.debug("shadow partitioned region " + 
event.getRegion().getFullPath()
+  + " is not created yet.");
+}
+return;
+  }
+  int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) 
event);
+  long shadowKey = event.getTailKey();
+
+  ParallelGatewaySenderQueue pgsq =
+  (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
+  boolean isPrimary = 
prQ.getRegio