This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new c362f77 GEODE-8497: added getTotalQueueSizeBytesInUse (#5514) c362f77 is described below commit c362f77591fa83e5f87a056fe221d1241de87348 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Mon Oct 19 08:02:55 2020 +0200 GEODE-8497: added getTotalQueueSizeBytesInUse (#5514) * GEODE-8497: added getTotalQueueSizeBytesInUse * GEODE-8497: updated UT * GEODE-8497: fix fail statistics after restart --- .../geode/internal/cache/RegionMapFactory.java | 9 ++++++++- .../geode/management/GatewaySenderMXBean.java | 5 +++++ .../internal/beans/GatewaySenderMBean.java | 5 +++++ .../internal/beans/GatewaySenderMBeanBridge.java | 6 ++++++ .../beans/stats/GatewaySenderOverflowMonitor.java | 21 +++++++++++++++++++++ .../management/internal/beans/stats/StatsKey.java | 2 ++ .../stats/GatewaySenderOverflowMonitorTest.java | 18 +++++++++++++++++- 7 files changed, 64 insertions(+), 2 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMapFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMapFactory.java index 48e64fe..92c85fe 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMapFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionMapFactory.java @@ -15,7 +15,7 @@ package org.apache.geode.internal.cache; - +import org.apache.geode.internal.cache.eviction.EvictionController; /** * Used to produce instances of RegionMap @@ -39,6 +39,13 @@ class RegionMapFactory { // eviction tests to fail return new ProxyRegionMap(owner, attrs, internalRegionArgs); } else if (owner.isEntryEvictionPossible()) { + if (owner instanceof PartitionedRegion) { + PartitionedRegion pr = (PartitionedRegion) owner; + EvictionController evctrl = pr.getPREvictionControllerFromDiskInitialization(); + if (evctrl != null) { + return new VMLRURegionMap(owner, attrs, internalRegionArgs, evctrl); + } + } return new VMLRURegionMap(owner, attrs, internalRegionArgs); } else { return new VMRegionMap(owner, attrs, internalRegionArgs); diff --git a/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java b/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java index 67c1350..a889dd5 100644 --- a/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java +++ b/geode-core/src/main/java/org/apache/geode/management/GatewaySenderMXBean.java @@ -177,6 +177,11 @@ public interface GatewaySenderMXBean { int getTotalBatchesRedistributed(); /** + * Returns the total number of bytes in heap occupied by the event queue. + */ + long getTotalQueueSizeBytesInUse(); + + /** * Starts this GatewaySender. Once the GatewaySender is running its configuration cannot be * changed. * diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java index a2d1251..1f422ff 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBean.java @@ -134,6 +134,11 @@ public class GatewaySenderMBean extends NotificationBroadcasterSupport } @Override + public long getTotalQueueSizeBytesInUse() { + return bridge.getTotalQueueSizeBytesInUse(); + } + + @Override public boolean isBatchConflationEnabled() { return bridge.isBatchConflationEnabled(); } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java index 42b4bbf..7eda2b4 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewaySenderMBeanBridge.java @@ -291,6 +291,12 @@ public class GatewaySenderMBeanBridge { .longValue(); } + public long getTotalQueueSizeBytesInUse() { + return overflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY) + .longValue(); + } + + private Number getStatistic(String statName) { if (monitor != null) { return monitor.getStatistic(statName); diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java index 24055fa..fb38cd1 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java @@ -47,6 +47,8 @@ public class GatewaySenderOverflowMonitor extends MBeanStatsMonitor { private volatile long lruEvictions = 0; private volatile long bytesOverflowedToDisk = 0; private volatile long entriesOverflowedToDisk = 0; + private volatile long bytesInUse = 0; + private final Map<Statistics, ValueMonitor> monitors; private final Map<Statistics, StatisticsListener> listeners; @@ -58,6 +60,10 @@ public class GatewaySenderOverflowMonitor extends MBeanStatsMonitor { return bytesOverflowedToDisk; } + public long getTotalQueueSizeBytesInUse() { + return bytesInUse; + } + public long getEntriesOverflowedToDisk() { return entriesOverflowedToDisk; } @@ -93,6 +99,11 @@ public class GatewaySenderOverflowMonitor extends MBeanStatsMonitor { return currentValue.longValue() - prevValue.longValue(); } + if (name.equals(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY)) { + Number prevValue = statsMap.getOrDefault(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY, 0); + return currentValue.longValue() - prevValue.longValue(); + } + return 0; } @@ -111,6 +122,12 @@ public class GatewaySenderOverflowMonitor extends MBeanStatsMonitor { bytesOverflowedToDisk += value.longValue(); return; } + + if (name.equals(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY)) { + bytesInUse += value.longValue(); + return; + } + } @Override @@ -127,6 +144,10 @@ public class GatewaySenderOverflowMonitor extends MBeanStatsMonitor { return getBytesOverflowedToDisk(); } + if (name.equals(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY)) { + return getTotalQueueSizeBytesInUse(); + } + return 0; } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java index d8c24e3..be65916 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/StatsKey.java @@ -309,6 +309,8 @@ public class StatsKey { public static final String GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK = "entriesOnlyOnDisk"; public static final String GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK = "bytesOnlyOnDisk"; + public static final String GATEWAYSENDER_BYTES_IN_MEMORY = "byteCount"; + /** AsyncEventQueue Stats **/ public static final String ASYNCEVENTQUEUE_EVENTS_QUEUE_SIZE = "eventQueueSize"; diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitorTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitorTest.java index aefdc18..dccabea 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitorTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitorTest.java @@ -79,6 +79,7 @@ public class GatewaySenderOverflowMonitorTest { assertThat(gatewaySenderOverflowMonitor.getLruEvictions()).isEqualTo(0); assertThat(gatewaySenderOverflowMonitor.getBytesOverflowedToDisk()).isEqualTo(0); assertThat(gatewaySenderOverflowMonitor.getEntriesOverflowedToDisk()).isEqualTo(0); + assertThat(gatewaySenderOverflowMonitor.getTotalQueueSizeBytesInUse()).isEqualTo(0); } @Test @@ -94,6 +95,7 @@ public class GatewaySenderOverflowMonitorTest { statsMap.put(StatsKey.GATEWAYSENDER_LRU_EVICTIONS, 50); statsMap.put(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK, 2048); statsMap.put(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK, 100); + statsMap.put(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY, 1024); assertThat(gatewaySenderOverflowMonitor.computeDelta(statsMap, StatsKey.GATEWAYSENDER_LRU_EVICTIONS, 60)).isEqualTo(10L); @@ -101,6 +103,9 @@ public class GatewaySenderOverflowMonitorTest { StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK, 2100)).isEqualTo(52L); assertThat(gatewaySenderOverflowMonitor.computeDelta(statsMap, StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK, 150)).isEqualTo(50L); + assertThat(gatewaySenderOverflowMonitor.computeDelta(statsMap, + StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY, 2048)).isEqualTo(1024L); + } @Test @@ -110,6 +115,8 @@ public class GatewaySenderOverflowMonitorTest { 1024L); gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK, 10000L); + gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY, + 4096L); assertThat(gatewaySenderOverflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_LRU_EVICTIONS)) .isEqualTo(5L); assertThat( @@ -117,12 +124,16 @@ public class GatewaySenderOverflowMonitorTest { .isEqualTo(1024L); assertThat(gatewaySenderOverflowMonitor .getStatistic(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)).isEqualTo(10000L); + assertThat(gatewaySenderOverflowMonitor + .getStatistic(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY)).isEqualTo(4096L); gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_LRU_EVICTIONS, 5L); gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_BYTES_OVERFLOWED_TO_DISK, 1024L); gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK, 10000L); + gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY, + 2048L); assertThat(gatewaySenderOverflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_LRU_EVICTIONS)) .isEqualTo(10L); assertThat( @@ -130,6 +141,8 @@ public class GatewaySenderOverflowMonitorTest { .isEqualTo(2048L); assertThat(gatewaySenderOverflowMonitor .getStatistic(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)).isEqualTo(20000L); + assertThat(gatewaySenderOverflowMonitor + .getStatistic(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY)).isEqualTo(6144L); } @Test @@ -144,7 +157,8 @@ public class GatewaySenderOverflowMonitorTest { 2048); gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK, 10000); - + gatewaySenderOverflowMonitor.increaseStats(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY, + 2048); assertThat(gatewaySenderOverflowMonitor.getStatistic(StatsKey.GATEWAYSENDER_LRU_EVICTIONS)) .isEqualTo(5L); assertThat( @@ -152,6 +166,8 @@ public class GatewaySenderOverflowMonitorTest { .isEqualTo(2048L); assertThat(gatewaySenderOverflowMonitor .getStatistic(StatsKey.GATEWAYSENDER_ENTRIES_OVERFLOWED_TO_DISK)).isEqualTo(10000L); + assertThat(gatewaySenderOverflowMonitor + .getStatistic(StatsKey.GATEWAYSENDER_BYTES_IN_MEMORY)).isEqualTo(2048L); } @Test