This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new f2bac5a  ARTEMIS-2321 Non-blocking Page::read on page cache
     new 56e33bc  This closes #2667
f2bac5a is described below

commit f2bac5ad08dc2b31628b622e68056d0e813e989e
Author: Francesco Nigro <nigro....@gmail.com>
AuthorDate: Thu Apr 25 22:30:53 2019 +0200

    ARTEMIS-2321 Non-blocking Page::read on page cache
---
 artemis-server/pom.xml                             |   5 +
 .../paging/cursor/impl/PageCursorProviderImpl.java | 106 ++++++++++++++++-----
 .../artemis/core/persistence/StorageManager.java   |   7 ++
 .../journal/AbstractJournalStorageManager.java     |   9 ++
 .../impl/nullpm/NullStorageManager.java            |   6 ++
 .../cursor/impl/PageCursorProviderImplTest.java    |  77 +++++++++++++++
 .../core/transaction/impl/TransactionImplTest.java |   6 ++
 .../org.mockito.plugins.MockMaker                  |   1 +
 pom.xml                                            |   2 +
 .../tests/integration/client/SendAckFailTest.java  |   5 +
 10 files changed, 202 insertions(+), 22 deletions(-)

diff --git a/artemis-server/pom.xml b/artemis-server/pom.xml
index 6ab2967..e20179f 100644
--- a/artemis-server/pom.xml
+++ b/artemis-server/pom.xml
@@ -189,6 +189,11 @@
          <artifactId>derby</artifactId>
          <scope>test</scope>
       </dependency>
+      <dependency>
+         <groupId>org.mockito</groupId>
+         <artifactId>mockito-core</artifactId>
+         <scope>test</scope>
+      </dependency>
    </dependencies>
 
    <profiles>
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index ddca7f2..4e03b3b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -19,9 +19,12 @@ package org.apache.activemq.artemis.core.paging.cursor.impl;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import io.netty.util.collection.LongObjectHashMap;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingStore;
@@ -71,8 +74,19 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
 
    private final SoftValueLongObjectHashMap<PageCache> softCache;
 
+   private final LongObjectHashMap<CompletableFuture<PageCache>> 
inProgressReadPages;
+
    private final ConcurrentLongHashMap<PageSubscription> activeCursors = new 
ConcurrentLongHashMap<>();
 
+   private static final long PAGE_READ_TIMEOUT_NS = 
TimeUnit.SECONDS.toNanos(30);
+
+   //Any concurrent read page request will wait in a loop the original 
Page::read to complete while
+   //printing at intervals a warn message
+   private static final long CONCURRENT_PAGE_READ_TIMEOUT_NS = 
TimeUnit.SECONDS.toNanos(10);
+
+   //storageManager.beforePageRead will be attempted in a loop, printing at 
intervals a warn message
+   private static final long PAGE_READ_PERMISSION_TIMEOUT_NS = 
TimeUnit.SECONDS.toNanos(10);
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -85,6 +99,7 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
       this.storageManager = storageManager;
       this.executor = executor;
       this.softCache = new SoftValueLongObjectHashMap<>(maxCacheSize);
+      this.inProgressReadPages = new LongObjectHashMap<>();
    }
 
    // Public --------------------------------------------------------
@@ -131,43 +146,82 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
    @Override
    public PageCache getPageCache(final long pageId) {
       try {
+         if (pageId > pagingStore.getCurrentWritingPage()) {
+            return null;
+         }
+         boolean createPage = false;
+         CompletableFuture<PageCache> inProgressReadPage;
          PageCache cache;
+         Page page = null;
          synchronized (softCache) {
-            if (pageId > pagingStore.getCurrentWritingPage()) {
+            cache = softCache.get(pageId);
+            if (cache != null) {
+               return cache;
+            }
+            if (!pagingStore.checkPageFileExists((int) pageId)) {
                return null;
             }
-
-            cache = softCache.get(pageId);
-            if (cache == null) {
-               if (!pagingStore.checkPageFileExists((int) pageId)) {
-                  return null;
-               }
-
+            inProgressReadPage = inProgressReadPages.get(pageId);
+            if (inProgressReadPage == null) {
+               final CompletableFuture<PageCache> readPage = new 
CompletableFuture<>();
                cache = createPageCache(pageId);
-               // anyone reading from this cache will have to wait reading to 
finish first
-               // we also want only one thread reading this cache
-               logger.tracef("adding pageCache pageNr=%d into cursor = %s", 
pageId, this.pagingStore.getAddress());
-               readPage((int) pageId, cache);
-               softCache.put(pageId, cache);
+               page = pagingStore.createPage((int) pageId);
+               createPage = true;
+               inProgressReadPage = readPage;
+               inProgressReadPages.put(pageId, readPage);
+            }
+         }
+         if (createPage) {
+            return readPage(pageId, page, cache, inProgressReadPage);
+         } else {
+            final long startedWait = System.nanoTime();
+            while (true) {
+               try {
+                  return 
inProgressReadPage.get(CONCURRENT_PAGE_READ_TIMEOUT_NS, TimeUnit.NANOSECONDS);
+               } catch (TimeoutException e) {
+                  final long elapsed = System.nanoTime() - startedWait;
+                  final long elapsedMillis = 
TimeUnit.NANOSECONDS.toMillis(elapsed);
+                  logger.warnf("Waiting a concurrent Page::read for pageNr=%d 
on cursor %s by %d ms",
+                               pageId, pagingStore.getAddress(), 
elapsedMillis);
+               }
             }
          }
-
-         return cache;
       } catch (Exception e) {
          throw new RuntimeException(e.getMessage(), e);
       }
    }
 
-   private void readPage(int pageId, PageCache cache) throws Exception {
-      Page page = null;
+   private PageCache readPage(long pageId,
+                              Page page,
+                              PageCache cache,
+                              CompletableFuture<PageCache> inProgressReadPage) 
throws Exception {
+      logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, 
this.pagingStore.getAddress());
+      boolean acquiredPageReadPermission = false;
       try {
-         page = pagingStore.createPage(pageId);
-
-         storageManager.beforePageRead();
+         final long startedRequest = System.nanoTime();
+         while (!acquiredPageReadPermission) {
+            acquiredPageReadPermission = 
storageManager.beforePageRead(PAGE_READ_PERMISSION_TIMEOUT_NS, 
TimeUnit.NANOSECONDS);
+            if (!acquiredPageReadPermission) {
+               final long elapsedMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedRequest);
+               logger.warnf("Cannot acquire page read permission of pageNr=%d 
on cursor %s after %d ms: consider increasing page-max-concurrent-io or use a 
faster disk",
+                            pageId, pagingStore.getAddress(), elapsedMillis);
+            }
+         }
          page.open();
-
+         final long startedReadPage = System.nanoTime();
          List<PagedMessage> pgdMessages = page.read(storageManager);
+         final long elapsedReadPage = System.nanoTime() - startedReadPage;
+         if (elapsedReadPage > PAGE_READ_TIMEOUT_NS) {
+            logger.warnf("Page::read for pageNr=%d on cursor %s tooks %d ms to 
read %d bytes", pageId,
+                         pagingStore.getAddress(), 
TimeUnit.NANOSECONDS.toMillis(elapsedReadPage), page.getSize());
+         }
          cache.setMessages(pgdMessages.toArray(new 
PagedMessage[pgdMessages.size()]));
+      } catch (Throwable t) {
+         inProgressReadPage.completeExceptionally(t);
+         synchronized (softCache) {
+            inProgressReadPages.remove(pageId);
+         }
+         throw t;
       } finally {
          try {
             if (page != null) {
@@ -175,8 +229,16 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
             }
          } catch (Throwable ignored) {
          }
-         storageManager.afterPageRead();
+         if (acquiredPageReadPermission) {
+            storageManager.afterPageRead();
+         }
+      }
+      inProgressReadPage.complete(cache);
+      synchronized (softCache) {
+         inProgressReadPages.remove(pageId);
+         softCache.put(pageId, cache);
       }
+      return cache;
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index d025d5e..73c43fe 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -147,6 +148,12 @@ public interface StorageManager extends IDGenerator, 
ActiveMQComponent {
    void beforePageRead() throws Exception;
 
    /**
+    * Like {@link #beforePageRead()} but return {@code true} if acquired 
within {@code timeout},
+    * {@code false} otherwise.
+    */
+   boolean beforePageRead(long timeout, TimeUnit unit) throws 
InterruptedException;
+
+   /**
     * We need a safeguard in place to avoid too much concurrent IO happening 
on Paging, otherwise
     * the system may become unresponsive if too many destinations are reading 
all the same time.
     * This is called after we read, so we can limit concurrent reads
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 1b92e86..fd14d55 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1633,6 +1633,15 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
    }
 
    @Override
+   public boolean beforePageRead(long timeout, TimeUnit unit) throws 
InterruptedException {
+      final Semaphore pageMaxConcurrentIO = this.pageMaxConcurrentIO;
+      if (pageMaxConcurrentIO == null) {
+         return true;
+      }
+      return pageMaxConcurrentIO.tryAcquire(timeout, unit);
+   }
+
+   @Override
    public void afterPageRead() throws Exception {
       if (pageMaxConcurrentIO != null) {
          pageMaxConcurrentIO.release();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 995e57b..577ce8b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.api.core.Message;
@@ -578,6 +579,11 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
+   public boolean beforePageRead(long timeout, TimeUnit unit) throws 
InterruptedException {
+      return true;
+   }
+
+   @Override
    public void afterPageRead() throws Exception {
    }
 
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java
new file mode 100644
index 0000000..4ed38e8
--- /dev/null
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImplTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.paging.cursor.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PageCache;
+import org.apache.activemq.artemis.core.paging.impl.Page;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static java.util.Collections.emptyList;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PageCursorProviderImplTest {
+
+   @Test(timeout = 30_000)
+   public void shouldAllowConcurrentPageReads() throws Exception {
+      final PagingStore pagingStore = mock(PagingStore.class);
+      final StorageManager storageManager = mock(StorageManager.class);
+      when(storageManager.beforePageRead(anyLong(), 
any(TimeUnit.class))).thenReturn(true);
+      final int pages = 2;
+      final ArtemisExecutor artemisExecutor = mock(ArtemisExecutor.class);
+      final PageCursorProviderImpl pageCursorProvider = new 
PageCursorProviderImpl(pagingStore, storageManager, artemisExecutor, 2);
+      when(pagingStore.getCurrentWritingPage()).thenReturn(pages);
+      when(pagingStore.checkPageFileExists(anyInt())).thenReturn(true);
+      final Page firstPage = mock(Page.class);
+      when(firstPage.getPageId()).thenReturn(1);
+      when(pagingStore.createPage(1)).thenReturn(firstPage);
+      final Page secondPage = mock(Page.class);
+      when(secondPage.getPageId()).thenReturn(2);
+      when(pagingStore.createPage(2)).thenReturn(secondPage);
+      final CountDownLatch finishFirstPageRead = new CountDownLatch(1);
+      final Thread concurrentRead = new Thread(() -> {
+         try {
+            final PageCache cache = pageCursorProvider.getPageCache(2);
+            Assert.assertNotNull(cache);
+         } finally {
+            finishFirstPageRead.countDown();
+         }
+      });
+      try {
+         when(firstPage.read(storageManager)).then(invocationOnMock -> {
+            concurrentRead.start();
+            finishFirstPageRead.await();
+            return emptyList();
+         });
+         Assert.assertNotNull(pageCursorProvider.getPageCache(1));
+      } finally {
+         pageCursorProvider.stop();
+         concurrentRead.interrupt();
+         concurrentRead.join();
+      }
+   }
+
+}
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index b51be9a..0a15022 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -290,6 +291,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
+      public boolean beforePageRead(long timeout, TimeUnit unit) throws 
InterruptedException {
+         return true;
+      }
+
+      @Override
       public void afterPageRead() throws Exception {
 
       }
diff --git 
a/artemis-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
 
b/artemis-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000..ca6ee9c
--- /dev/null
+++ 
b/artemis-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index b32a6b0..3c7b1a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1631,6 +1631,8 @@
                   
<exclude>activemq-artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h</exclude>
                   <exclude>**/dependency-reduced-pom.xml</exclude>
 
+                  <!-- Mockito -->
+                  
<exclude>**/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker</exclude>
                </excludes>
             </configuration>
             <executions>
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index b9b686c..9364aa0 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -366,6 +366,11 @@ public class SendAckFailTest extends SpawnedTestBase {
       }
 
       @Override
+      public boolean beforePageRead(long timeout, TimeUnit unit) throws 
InterruptedException {
+         return manager.beforePageRead(timeout, unit);
+      }
+
+      @Override
       public void afterPageRead() throws Exception {
          manager.afterPageRead();
       }

Reply via email to