This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9154cfc7ed GEODE-10226: Added monitoring of async writer (#7667)
9154cfc7ed is described below

commit 9154cfc7ed70f1accd695a7e99714d3886e29ba9
Author: Mario Ivanac <48509724+miva...@users.noreply.github.com>
AuthorDate: Wed May 25 07:32:44 2022 +0200

    GEODE-10226: Added monitoring of async writer (#7667)
    
    * GEODE-10226: Added monitoring of async writer
---
 .../apache/geode/internal/cache/DiskStoreImpl.java | 121 ++++++++++++---------
 .../internal/monitoring/ThreadsMonitoring.java     |   4 +-
 .../internal/monitoring/ThreadsMonitoringImpl.java |   4 +
 .../monitoring/executor/AbstractExecutor.java      |   4 +
 ...Executor.java => AsyncWriterExecutorGroup.java} |  28 +----
 .../monitoring/executor/SuspendableExecutor.java   |   9 ++
 .../cache/DiskStoreImplValueRecoveryTest.java      |  14 +++
 .../geode/internal/cache/FlusherThreadTest.java    |  17 +++
 .../monitoring/ThreadsMonitoringImplJUnitTest.java |   1 +
 .../monitoring/ThreadsMonitoringJUnitTest.java     |   6 +-
 .../executor/SuspendableExecutorTest.java          |  11 ++
 11 files changed, 142 insertions(+), 77 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index 9dee1c1c77..0f9865b720 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -19,6 +19,7 @@ import static 
org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FIL
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static 
org.apache.geode.internal.cache.entries.DiskEntry.Helper.readRawValue;
+import static 
org.apache.geode.internal.monitoring.ThreadsMonitoring.Mode.AsyncWriterExecutor;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -113,6 +114,8 @@ import 
org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.cache.versions.VersionSource;
 import org.apache.geode.internal.cache.versions.VersionStamp;
 import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.util.BlobHelper;
 import org.apache.geode.logging.internal.executors.LoggingExecutors;
@@ -1649,6 +1652,10 @@ public class DiskStoreImpl implements DiskStore {
       this.diskStore = diskStore;
     }
 
+    private ThreadsMonitoring getThreadMonitoring() {
+      return 
diskStore.getCache().getInternalDistributedSystem().getDM().getThreadMonitoring();
+    }
+
     private boolean waitUntilFlushIsReady() throws InterruptedException {
       if (diskStore.maxAsyncItems > 0) {
         final long time = diskStore.getTimeInterval();
@@ -1716,67 +1723,79 @@ public class DiskStoreImpl implements DiskStore {
         logger.debug("Async writer thread started");
       }
       boolean doingFlush = false;
+      final ThreadsMonitoring threadMonitoring = getThreadMonitoring();
+      final AbstractExecutor threadMonitorExecutor =
+          threadMonitoring.createAbstractExecutor(AsyncWriterExecutor);
+      threadMonitorExecutor.suspendMonitoring();
+      threadMonitoring.register(threadMonitorExecutor);
+
       try {
         while (waitUntilFlushIsReady()) {
-          int drainCount = diskStore.fillDrainList();
-          if (drainCount > 0) {
-            Iterator<Object> it = diskStore.getDrainList().iterator();
-            while (it.hasNext()) {
-              Object o = it.next();
-              if (o instanceof FlushNotifier) {
-                flushChild();
-                if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
-                  if (!it.hasNext()) {
-                    doingFlush = false;
-                    CacheObserverHolder.getInstance().afterWritingBytes();
+          threadMonitorExecutor.resumeMonitoring();
+          try {
+            int drainCount = diskStore.fillDrainList();
+            if (drainCount > 0) {
+              Iterator<Object> it = diskStore.getDrainList().iterator();
+              while (it.hasNext()) {
+                threadMonitorExecutor.reportProgress();
+                Object o = it.next();
+                if (o instanceof FlushNotifier) {
+                  flushChild();
+                  if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
+                    if (!it.hasNext()) {
+                      doingFlush = false;
+                      CacheObserverHolder.getInstance().afterWritingBytes();
+                    }
                   }
-                }
-                ((FlushNotifier) o).doFlush();
-              } else {
-                try {
-                  AsyncDiskEntry ade = (AsyncDiskEntry) o;
-                  InternalRegion region = ade.region;
-                  VersionTag tag = ade.tag;
-                  if (ade.versionOnly) {
-                    DiskEntry.Helper.doAsyncFlush(tag, region);
-                  } else {
-                    DiskEntry entry = ade.de;
-                    // We check isPendingAsync
-                    if (entry.getDiskId().isPendingAsync()) {
-                      if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
-                        if (!doingFlush) {
-                          doingFlush = true;
-                          CacheObserverHolder.getInstance().goingToFlush();
-                        }
-                      }
-                      DiskEntry.Helper.doAsyncFlush(entry, region, tag);
+                  ((FlushNotifier) o).doFlush();
+                } else {
+                  try {
+                    AsyncDiskEntry ade = (AsyncDiskEntry) o;
+                    InternalRegion region = ade.region;
+                    VersionTag tag = ade.tag;
+                    if (ade.versionOnly) {
+                      DiskEntry.Helper.doAsyncFlush(tag, region);
                     } else {
-                      // If it is no longer pending someone called
-                      // unscheduleAsyncWrite
-                      // so we don't need to write the entry, but
-                      // if we have a version tag we need to record the
-                      // operation
-                      // to update the RVV
-                      if (tag != null) {
-                        DiskEntry.Helper.doAsyncFlush(tag, region);
+                      DiskEntry entry = ade.de;
+                      // We check isPendingAsync
+                      if (entry.getDiskId().isPendingAsync()) {
+                        if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
+                          if (!doingFlush) {
+                            doingFlush = true;
+                            CacheObserverHolder.getInstance().goingToFlush();
+                          }
+                        }
+                        DiskEntry.Helper.doAsyncFlush(entry, region, tag);
+                      } else {
+                        // If it is no longer pending someone called
+                        // unscheduleAsyncWrite
+                        // so we don't need to write the entry, but
+                        // if we have a version tag we need to record the
+                        // operation
+                        // to update the RVV
+                        if (tag != null) {
+                          DiskEntry.Helper.doAsyncFlush(tag, region);
+                        }
                       }
                     }
+                  } catch (RegionDestroyedException ignore) {
+                    // Normally we flush before closing or destroying a region
+                    // but in some cases it is closed w/o flushing.
+                    // So just ignore it; see bug 41305.
                   }
-                } catch (RegionDestroyedException ignore) {
-                  // Normally we flush before closing or destroying a region
-                  // but in some cases it is closed w/o flushing.
-                  // So just ignore it; see bug 41305.
                 }
               }
-            }
-            flushChild();
-            if (doingFlush) {
-              doingFlush = false;
-              if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
-                CacheObserverHolder.getInstance().afterWritingBytes();
+              flushChild();
+              if (doingFlush) {
+                doingFlush = false;
+                if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
+                  CacheObserverHolder.getInstance().afterWritingBytes();
+                }
               }
+              diskStore.getStats().incQueueSize(-drainCount);
             }
-            diskStore.getStats().incQueueSize(-drainCount);
+          } finally {
+            threadMonitorExecutor.suspendMonitoring();
           }
         }
       } catch (InterruptedException ie) {
@@ -1802,6 +1821,8 @@ public class DiskStoreImpl implements DiskStore {
         }
         diskStore.flusherThreadTerminated = true;
         diskStore.stopFlusher = true; // set this before calling 
handleDiskAccessException
+        threadMonitoring.unregister(threadMonitorExecutor);
+
         // or it will hang
         if (fatalDae != null) {
           diskStore.handleDiskAccessException(fatalDae);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
 
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
index d1f8d2b0b1..3a06449306 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
@@ -29,7 +29,9 @@ public interface ThreadsMonitoring {
     ScheduledThreadExecutor,
     AGSExecutor,
     P2PReaderExecutor,
-    ServerConnectionExecutor
+    ServerConnectionExecutor,
+
+    AsyncWriterExecutor
   }
 
   Map<Long, AbstractExecutor> getMonitorMap();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
index 3be816f3db..df66945f66 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.monitoring.executor.AsyncWriterExecutorGroup;
 import 
org.apache.geode.internal.monitoring.executor.FunctionExecutionPooledExecutorGroup;
 import 
org.apache.geode.internal.monitoring.executor.GatewaySenderEventProcessorGroup;
 import org.apache.geode.internal.monitoring.executor.OneTaskOnlyExecutorGroup;
@@ -138,6 +139,9 @@ public class ThreadsMonitoringImpl implements 
ThreadsMonitoring {
         return new P2PReaderExecutorGroup();
       case ServerConnectionExecutor:
         return new ServerConnectionExecutorGroup();
+      case AsyncWriterExecutor:
+        return new AsyncWriterExecutorGroup();
+
       default:
         throw new IllegalStateException("Unhandled mode=" + mode);
     }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
index 3e8560d73b..6e3a36b862 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AbstractExecutor.java
@@ -178,6 +178,10 @@ public abstract class AbstractExecutor {
 
   public void resumeMonitoring() {}
 
+  public void reportProgress() {
+    setStartTime(System.currentTimeMillis());
+  }
+
   public boolean isMonitoringSuspended() {
     return false;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AsyncWriterExecutorGroup.java
similarity index 57%
copy from 
geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
copy to 
geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AsyncWriterExecutorGroup.java
index b64b519a44..f930416ad4 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/AsyncWriterExecutorGroup.java
@@ -14,30 +14,10 @@
  */
 package org.apache.geode.internal.monitoring.executor;
 
-public abstract class SuspendableExecutor extends AbstractExecutor {
-  private volatile boolean suspended;
+public class AsyncWriterExecutorGroup extends SuspendableExecutor {
+  public static final String GROUP_NAME = "AsyncWriterExecutor";
 
-  public SuspendableExecutor(String groupName) {
-    super(groupName);
-  }
-
-  @Override
-  public void suspendMonitoring() {
-    suspended = true;
-  }
-
-  @Override
-  public void resumeMonitoring() {
-    setStartTime(0);
-    // The ThreadMonitoringProcess will set the
-    // startTime once it sees it set to 0.
-    // This prevents the monitored thread from
-    // constantly calling System.currentTimeMillis.
-    suspended = false;
-  }
-
-  @Override
-  public boolean isMonitoringSuspended() {
-    return suspended;
+  public AsyncWriterExecutorGroup() {
+    super(GROUP_NAME);
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
index b64b519a44..9f23609da4 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
@@ -36,6 +36,15 @@ public abstract class SuspendableExecutor extends 
AbstractExecutor {
     suspended = false;
   }
 
+  @Override
+  public void reportProgress() {
+    setStartTime(0);
+    // The ThreadMonitoringProcess will set the
+    // startTime once it sees it set to 0.
+    // This prevents the monitored thread from
+    // constantly calling System.currentTimeMillis.
+  }
+
   @Override
   public boolean isMonitoringSuspended() {
     return suspended;
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplValueRecoveryTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplValueRecoveryTest.java
index fb612a8ac3..60d61b0730 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplValueRecoveryTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplValueRecoveryTest.java
@@ -32,7 +32,11 @@ import org.mockito.ArgumentCaptor;
 
 import org.apache.geode.Statistics;
 import org.apache.geode.StatisticsFactory;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
 
 public class DiskStoreImplValueRecoveryTest {
 
@@ -46,10 +50,20 @@ public class DiskStoreImplValueRecoveryTest {
     StatisticsFactory statisticsFactory = mock(StatisticsFactory.class);
     internalResourceManager = mock(InternalResourceManager.class);
 
+    InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+    DistributionManager dm = mock(DistributionManager.class);
+    ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
+
     when(statisticsFactory.createStatistics(any(), 
any())).thenReturn(mock(Statistics.class));
     when(cache.getCachePerfStats()).thenReturn(mock(CachePerfStats.class));
     when(cache.getDiskStoreMonitor()).thenReturn(mock(DiskStoreMonitor.class));
 
+    when(cache.getInternalDistributedSystem()).thenReturn(ids);
+    when(ids.getDM()).thenReturn(dm);
+    when(dm.getThreadMonitoring()).thenReturn(threadsMonitoring);
+    
when(threadsMonitoring.createAbstractExecutor(any())).thenReturn(mock(AbstractExecutor.class));
+
+
     diskStore = new DiskStoreImpl(cache, "name", diskStoreAttributes, false, 
null, false, false,
         false, false, false, false, statisticsFactory,
         internalResourceManager);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java
index ad81413a90..cb69fbf33f 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -28,6 +29,10 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.cache.DiskAccessException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
 
 public class FlusherThreadTest {
 
@@ -42,6 +47,10 @@ public class FlusherThreadTest {
     diskStoreImpl = mock(DiskStoreImpl.class);
     diskStoreStats = mock(DiskStoreStats.class);
     PersistentOplogSet persistentOpLogSet = mock(PersistentOplogSet.class);
+    InternalCache cache = mock(InternalCache.class);
+    InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+    DistributionManager dm = mock(DistributionManager.class);
+    ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
 
     when(diskStoreImpl.getAsyncMonitor()).thenReturn(new Object());
     when(diskStoreImpl.getForceFlushCount()).thenReturn(new AtomicInteger(1));
@@ -52,6 +61,14 @@ public class FlusherThreadTest {
     when(diskStoreImpl.checkAndClearForceFlush()).thenReturn(true);
     when(diskStoreImpl.isStopFlusher()).thenReturn(false).thenReturn(true);
 
+    when(diskStoreImpl.getAsyncMonitor()).thenReturn(new Object());
+    when(diskStoreImpl.getCache()).thenReturn(cache);
+    when(cache.getInternalDistributedSystem()).thenReturn(ids);
+    when(ids.getDM()).thenReturn(dm);
+    when(dm.getThreadMonitoring()).thenReturn(threadsMonitoring);
+    
when(threadsMonitoring.createAbstractExecutor(any())).thenReturn(mock(AbstractExecutor.class));
+
+
     flusherThread = new DiskStoreImpl.FlusherThread(diskStoreImpl);
   }
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
index 78af92ab7f..88f73a17ed 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
@@ -63,6 +63,7 @@ public class ThreadsMonitoringImplJUnitTest {
     assertTrue(threadsMonitoringImpl.startMonitor(Mode.AGSExecutor));
     assertTrue(threadsMonitoringImpl.startMonitor(Mode.P2PReaderExecutor));
     
assertTrue(threadsMonitoringImpl.startMonitor(Mode.ServerConnectionExecutor));
+    assertTrue(threadsMonitoringImpl.startMonitor(Mode.AsyncWriterExecutor));
   }
 
   @Test
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
index 7dd9d67c87..8d5ac0a711 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
@@ -36,11 +36,13 @@ public class ThreadsMonitoringJUnitTest {
     ScheduledThreadExecutor,
     AGSExecutor,
     P2PReaderExecutor,
-    ServerConnectionExecutor
+    ServerConnectionExecutor,
+
+    AsyncWriterExecutor
   }
 
 
-  public final int numberOfElements = 8;
+  public final int numberOfElements = 9;
   private static final Logger logger = LogService.getLogger();
 
   /**
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
index 5cc9afbbce..ee22d52473 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
@@ -50,4 +50,15 @@ public class SuspendableExecutorTest {
     executor.resumeMonitoring();
     assertThat(executor.getStartTime()).isEqualTo(0);
   }
+
+  @Test
+  public void verifyReportProgressAfterResume() {
+    SuspendableExecutor executor = new FakeSuspendableExecutor();
+    executor.resumeMonitoring();
+    assertThat(executor.getStartTime()).isEqualTo(0);
+    executor.setStartTime(11);
+    assertThat(executor.getStartTime()).isEqualTo(11);
+    executor.reportProgress();
+    assertThat(executor.getStartTime()).isEqualTo(0);
+  }
 }

Reply via email to