This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 9154cfc7ed GEODE-10226: Added monitoring of async writer (#7667) 9154cfc7ed is described below commit 9154cfc7ed70f1accd695a7e99714d3886e29ba9 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Wed May 25 07:32:44 2022 +0200 GEODE-10226: Added monitoring of async writer (#7667) * GEODE-10226: Added monitoring of async writer --- .../apache/geode/internal/cache/DiskStoreImpl.java | 121 ++++++++++++--------- .../internal/monitoring/ThreadsMonitoring.java | 4 +- .../internal/monitoring/ThreadsMonitoringImpl.java | 4 + .../monitoring/executor/AbstractExecutor.java | 4 + ...Executor.java => AsyncWriterExecutorGroup.java} | 28 +---- .../monitoring/executor/SuspendableExecutor.java | 9 ++ .../cache/DiskStoreImplValueRecoveryTest.java | 14 +++ .../geode/internal/cache/FlusherThreadTest.java | 17 +++ .../monitoring/ThreadsMonitoringImplJUnitTest.java | 1 + .../monitoring/ThreadsMonitoringJUnitTest.java | 6 +- .../executor/SuspendableExecutorTest.java | 11 ++ 11 files changed, 142 insertions(+), 77 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java index 9dee1c1c77..0f9865b720 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java @@ -19,6 +19,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FIL import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; import static org.apache.geode.internal.cache.entries.DiskEntry.Helper.readRawValue; +import static org.apache.geode.internal.monitoring.ThreadsMonitoring.Mode.AsyncWriterExecutor; import java.io.File; import java.io.FileOutputStream; @@ -113,6 +114,8 @@ import org.apache.geode.internal.cache.versions.RegionVersionVector; import org.apache.geode.internal.cache.versions.VersionSource; import org.apache.geode.internal.cache.versions.VersionStamp; import org.apache.geode.internal.cache.versions.VersionTag; +import org.apache.geode.internal.monitoring.ThreadsMonitoring; +import org.apache.geode.internal.monitoring.executor.AbstractExecutor; import org.apache.geode.internal.serialization.KnownVersion; import org.apache.geode.internal.util.BlobHelper; import org.apache.geode.logging.internal.executors.LoggingExecutors; @@ -1649,6 +1652,10 @@ public class DiskStoreImpl implements DiskStore { this.diskStore = diskStore; } + private ThreadsMonitoring getThreadMonitoring() { + return diskStore.getCache().getInternalDistributedSystem().getDM().getThreadMonitoring(); + } + private boolean waitUntilFlushIsReady() throws InterruptedException { if (diskStore.maxAsyncItems > 0) { final long time = diskStore.getTimeInterval(); @@ -1716,67 +1723,79 @@ public class DiskStoreImpl implements DiskStore { logger.debug("Async writer thread started"); } boolean doingFlush = false; + final ThreadsMonitoring threadMonitoring = getThreadMonitoring(); + final AbstractExecutor threadMonitorExecutor = + threadMonitoring.createAbstractExecutor(AsyncWriterExecutor); + threadMonitorExecutor.suspendMonitoring(); + threadMonitoring.register(threadMonitorExecutor); + try { while (waitUntilFlushIsReady()) { - int drainCount = diskStore.fillDrainList(); - if (drainCount > 0) { - Iterator<Object> it = diskStore.getDrainList().iterator(); - while (it.hasNext()) { - Object o = it.next(); - if (o instanceof FlushNotifier) { - flushChild(); - if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { - if (!it.hasNext()) { - doingFlush = false; - CacheObserverHolder.getInstance().afterWritingBytes(); + threadMonitorExecutor.resumeMonitoring(); + try { + int drainCount = diskStore.fillDrainList(); + if (drainCount > 0) { + Iterator<Object> it = diskStore.getDrainList().iterator(); + while (it.hasNext()) { + threadMonitorExecutor.reportProgress(); + Object o = it.next(); + if (o instanceof FlushNotifier) { + flushChild(); + if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { + if (!it.hasNext()) { + doingFlush = false; + CacheObserverHolder.getInstance().afterWritingBytes(); + } } - } - ((FlushNotifier) o).doFlush(); - } else { - try { - AsyncDiskEntry ade = (AsyncDiskEntry) o; - InternalRegion region = ade.region; - VersionTag tag = ade.tag; - if (ade.versionOnly) { - DiskEntry.Helper.doAsyncFlush(tag, region); - } else { - DiskEntry entry = ade.de; - // We check isPendingAsync - if (entry.getDiskId().isPendingAsync()) { - if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { - if (!doingFlush) { - doingFlush = true; - CacheObserverHolder.getInstance().goingToFlush(); - } - } - DiskEntry.Helper.doAsyncFlush(entry, region, tag); + ((FlushNotifier) o).doFlush(); + } else { + try { + AsyncDiskEntry ade = (AsyncDiskEntry) o; + InternalRegion region = ade.region; + VersionTag tag = ade.tag; + if (ade.versionOnly) { + DiskEntry.Helper.doAsyncFlush(tag, region); } else { - // If it is no longer pending someone called - // unscheduleAsyncWrite - // so we don't need to write the entry, but - // if we have a version tag we need to record the - // operation - // to update the RVV - if (tag != null) { - DiskEntry.Helper.doAsyncFlush(tag, region); + DiskEntry entry = ade.de; + // We check isPendingAsync + if (entry.getDiskId().isPendingAsync()) { + if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { + if (!doingFlush) { + doingFlush = true; + CacheObserverHolder.getInstance().goingToFlush(); + } + } + DiskEntry.Helper.doAsyncFlush(entry, region, tag); + } else { + // If it is no longer pending someone called + // unscheduleAsyncWrite + // so we don't need to write the entry, but + // if we have a version tag we need to record the + // operation + // to update the RVV + if (tag != null) { + DiskEntry.Helper.doAsyncFlush(tag, region); + } } } + } catch (RegionDestroyedException ignore) { + // Normally we flush before closing or destroying a region + // but in some cases it is closed w/o flushing. + // So just ignore it; see bug 41305. } - } catch (RegionDestroyedException ignore) { - // Normally we flush before closing or destroying a region - // but in some cases it is closed w/o flushing. - // So just ignore it; see bug 41305. } } - } - flushChild(); - if (doingFlush) { - doingFlush = false; - if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { - CacheObserverHolder.getInstance().afterWritingBytes(); + flushChild(); + if (doingFlush) { + doingFlush = false; + if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) { + CacheObserverHolder.getInstance().afterWritingBytes(); + } } + diskStore.getStats().incQueueSize(-drainCount); } - diskStore.getStats().incQueueSize(-drainCount); + } finally { + threadMonitorExecutor.suspendMonitoring(); } } } catch (InterruptedException ie) { @@ -1802,6 +1821,8 @@ public class DiskStoreImpl implements DiskStore { } diskStore.flusherThreadTerminated = true; diskStore.stopFlusher = true; // set this before calling handleDiskAccessException + threadMonitoring.unregister(threadMonitorExecutor); + // or it will hang if (fatalDae != null) { diskStore.handleDiskAccessException(fatalDae); diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java index d1f8d2b0b1..3a06449306 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java +++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java @@ -29,7 +29,9 @@ public interface ThreadsMonitoring { ScheduledThreadExecutor, AGSExecutor, P2PReaderExecutor, - ServerConnectionExecutor + ServerConnectionExecutor, + + AsyncWriterExecutor } Map<Long, AbstractExecutor> getMonitorMap(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java index 3be816f3db..df66945f66 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.monitoring.executor.AbstractExecutor; +import org.apache.geode.internal.monitoring.executor.AsyncWriterExecutorGroup; import org.apache.geode.internal.monitoring.executor.FunctionExecutionPooledExecutorGroup; import org.apache.geode.internal.monitoring.executor.GatewaySenderEventProcessorGroup; import org.apache.geode.internal.monitoring.executor.OneTaskOnlyExecutorGroup; @@ -138,6 +139,9 @@ public class ThreadsMonitoringImpl implements ThreadsMonitoring { return new P2PReaderExecutorGroup(); case ServerConnectionExecutor: return new ServerConnectionExecutorGroup(); + case AsyncWriterExecutor: + return new AsyncWriterExecutorGroup(); + default: throw new IllegalStateException("Unhandled mode=" + mode); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java index 3e8560d73b..6e3a36b862 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java @@ -178,6 +178,10 @@ public abstract class AbstractExecutor { public void resumeMonitoring() {} + public void reportProgress() { + setStartTime(System.currentTimeMillis()); + } + public boolean isMonitoringSuspended() { return false; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AsyncWriterExecutorGroup.java similarity index 57% copy from geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java copy to geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AsyncWriterExecutorGroup.java index b64b519a44..f930416ad4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AsyncWriterExecutorGroup.java @@ -14,30 +14,10 @@ */ package org.apache.geode.internal.monitoring.executor; -public abstract class SuspendableExecutor extends AbstractExecutor { - private volatile boolean suspended; +public class AsyncWriterExecutorGroup extends SuspendableExecutor { + public static final String GROUP_NAME = "AsyncWriterExecutor"; - public SuspendableExecutor(String groupName) { - super(groupName); - } - - @Override - public void suspendMonitoring() { - suspended = true; - } - - @Override - public void resumeMonitoring() { - setStartTime(0); - // The ThreadMonitoringProcess will set the - // startTime once it sees it set to 0. - // This prevents the monitored thread from - // constantly calling System.currentTimeMillis. - suspended = false; - } - - @Override - public boolean isMonitoringSuspended() { - return suspended; + public AsyncWriterExecutorGroup() { + super(GROUP_NAME); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java index b64b519a44..9f23609da4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java @@ -36,6 +36,15 @@ public abstract class SuspendableExecutor extends AbstractExecutor { suspended = false; } + @Override + public void reportProgress() { + setStartTime(0); + // The ThreadMonitoringProcess will set the + // startTime once it sees it set to 0. + // This prevents the monitored thread from + // constantly calling System.currentTimeMillis. + } + @Override public boolean isMonitoringSuspended() { return suspended; diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplValueRecoveryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplValueRecoveryTest.java index fb612a8ac3..60d61b0730 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplValueRecoveryTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplValueRecoveryTest.java @@ -32,7 +32,11 @@ import org.mockito.ArgumentCaptor; import org.apache.geode.Statistics; import org.apache.geode.StatisticsFactory; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.cache.control.InternalResourceManager; +import org.apache.geode.internal.monitoring.ThreadsMonitoring; +import org.apache.geode.internal.monitoring.executor.AbstractExecutor; public class DiskStoreImplValueRecoveryTest { @@ -46,10 +50,20 @@ public class DiskStoreImplValueRecoveryTest { StatisticsFactory statisticsFactory = mock(StatisticsFactory.class); internalResourceManager = mock(InternalResourceManager.class); + InternalDistributedSystem ids = mock(InternalDistributedSystem.class); + DistributionManager dm = mock(DistributionManager.class); + ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class); + when(statisticsFactory.createStatistics(any(), any())).thenReturn(mock(Statistics.class)); when(cache.getCachePerfStats()).thenReturn(mock(CachePerfStats.class)); when(cache.getDiskStoreMonitor()).thenReturn(mock(DiskStoreMonitor.class)); + when(cache.getInternalDistributedSystem()).thenReturn(ids); + when(ids.getDM()).thenReturn(dm); + when(dm.getThreadMonitoring()).thenReturn(threadsMonitoring); + when(threadsMonitoring.createAbstractExecutor(any())).thenReturn(mock(AbstractExecutor.class)); + + diskStore = new DiskStoreImpl(cache, "name", diskStoreAttributes, false, null, false, false, false, false, false, false, statisticsFactory, internalResourceManager); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java index ad81413a90..cb69fbf33f 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java @@ -14,6 +14,7 @@ */ package org.apache.geode.internal.cache; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -28,6 +29,10 @@ import org.junit.Before; import org.junit.Test; import org.apache.geode.cache.DiskAccessException; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.internal.monitoring.ThreadsMonitoring; +import org.apache.geode.internal.monitoring.executor.AbstractExecutor; public class FlusherThreadTest { @@ -42,6 +47,10 @@ public class FlusherThreadTest { diskStoreImpl = mock(DiskStoreImpl.class); diskStoreStats = mock(DiskStoreStats.class); PersistentOplogSet persistentOpLogSet = mock(PersistentOplogSet.class); + InternalCache cache = mock(InternalCache.class); + InternalDistributedSystem ids = mock(InternalDistributedSystem.class); + DistributionManager dm = mock(DistributionManager.class); + ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class); when(diskStoreImpl.getAsyncMonitor()).thenReturn(new Object()); when(diskStoreImpl.getForceFlushCount()).thenReturn(new AtomicInteger(1)); @@ -52,6 +61,14 @@ public class FlusherThreadTest { when(diskStoreImpl.checkAndClearForceFlush()).thenReturn(true); when(diskStoreImpl.isStopFlusher()).thenReturn(false).thenReturn(true); + when(diskStoreImpl.getAsyncMonitor()).thenReturn(new Object()); + when(diskStoreImpl.getCache()).thenReturn(cache); + when(cache.getInternalDistributedSystem()).thenReturn(ids); + when(ids.getDM()).thenReturn(dm); + when(dm.getThreadMonitoring()).thenReturn(threadsMonitoring); + when(threadsMonitoring.createAbstractExecutor(any())).thenReturn(mock(AbstractExecutor.class)); + + flusherThread = new DiskStoreImpl.FlusherThread(diskStoreImpl); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java index 78af92ab7f..88f73a17ed 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java @@ -63,6 +63,7 @@ public class ThreadsMonitoringImplJUnitTest { assertTrue(threadsMonitoringImpl.startMonitor(Mode.AGSExecutor)); assertTrue(threadsMonitoringImpl.startMonitor(Mode.P2PReaderExecutor)); assertTrue(threadsMonitoringImpl.startMonitor(Mode.ServerConnectionExecutor)); + assertTrue(threadsMonitoringImpl.startMonitor(Mode.AsyncWriterExecutor)); } @Test diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java index 7dd9d67c87..8d5ac0a711 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java @@ -36,11 +36,13 @@ public class ThreadsMonitoringJUnitTest { ScheduledThreadExecutor, AGSExecutor, P2PReaderExecutor, - ServerConnectionExecutor + ServerConnectionExecutor, + + AsyncWriterExecutor } - public final int numberOfElements = 8; + public final int numberOfElements = 9; private static final Logger logger = LogService.getLogger(); /** diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java index 5cc9afbbce..ee22d52473 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java @@ -50,4 +50,15 @@ public class SuspendableExecutorTest { executor.resumeMonitoring(); assertThat(executor.getStartTime()).isEqualTo(0); } + + @Test + public void verifyReportProgressAfterResume() { + SuspendableExecutor executor = new FakeSuspendableExecutor(); + executor.resumeMonitoring(); + assertThat(executor.getStartTime()).isEqualTo(0); + executor.setStartTime(11); + assertThat(executor.getStartTime()).isEqualTo(11); + executor.reportProgress(); + assertThat(executor.getStartTime()).isEqualTo(0); + } }