ARTEMIS-1958 Artemis may not be able to delete pages when there are some empty page files
(cherry picked from commit 5ec2234010dfad8c7ff2e49f2505ea44dba9388a) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b703ca31 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b703ca31 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b703ca31 Branch: refs/heads/2.6.x Commit: b703ca3154492744e2b193262d1df5417342cd5f Parents: 16f8674 Author: 17103355 <17103...@cnsuning.com> Authored: Thu Jun 28 20:14:50 2018 +0800 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Jul 12 14:51:20 2018 -0400 ---------------------------------------------------------------------- .../cursor/impl/PageSubscriptionImpl.java | 21 +++++ .../tests/integration/paging/PagingTest.java | 93 ++++++++++++++++++++ 2 files changed, 114 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b703ca31/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index e1c9537..5fec9a4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -383,19 +383,29 @@ final class PageSubscriptionImpl implements PageSubscription { PageCache cache = cursorProvider.getPageCache(pos.getPageNr()); + PageCache emptyCache = null; if (cache != null && !cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages()) { + emptyCache = cache; + saveEmptyPageAsConsumedPage(emptyCache); // The next message is beyond what's available at the current page, so we need to move to the next page cache = null; } // it will scan for the next available page while ((cache == null && retPos.getPageNr() <= pageStore.getCurrentWritingPage()) || (cache != null && retPos.getPageNr() <= pageStore.getCurrentWritingPage() && cache.getNumberOfMessages() == 0)) { + emptyCache = cache; retPos = moveNextPage(retPos); cache = cursorProvider.getPageCache(retPos.getPageNr()); + + if (cache != null) { + saveEmptyPageAsConsumedPage(emptyCache); + } } if (cache == null) { + saveEmptyPageAsConsumedPage(emptyCache); + // it will be null in the case of the current writing page return null; } else { @@ -787,6 +797,17 @@ final class PageSubscriptionImpl implements PageSubscription { } + private void saveEmptyPageAsConsumedPage(final PageCache cache) { + if (cache != null && cache.getNumberOfMessages() == 0) { + synchronized (consumedPages) { + PageCursorInfo pageInfo = consumedPages.get(cache.getPageId()); + if (pageInfo == null) { + consumedPages.put(cache.getPageId(), new PageCursorInfo(cache.getPageId(), cache.getNumberOfMessages(), cache)); + } + } + } + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b703ca31/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index bc09fa1..566142d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -5405,6 +5405,99 @@ public class PagingTest extends ActiveMQTestBase { internalTestMultiFilters(false); } + @Test + public void testPageEmptyFile() throws Exception { + boolean persistentMessages = true; + + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + + server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); + + server.start(); + + final int messageSize = 1024; + + final int numberOfMessages = 100; + + try { + ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + ClientSessionFactory sf = locator.createSessionFactory(); + + ClientSession session = sf.createSession(false, false, false); + + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); + + PagingStore store = server.getPagingManager().getPageStore(ADDRESS); + store.forceAnotherPage(); + store.forceAnotherPage(); + + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + + ClientMessage message = null; + + byte[] body = new byte[messageSize]; + + for (int i = 0; i < numberOfMessages; i++) { + message = session.createMessage(persistentMessages); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(body); + + producer.send(message); + } + + session.commit(); + + Queue queue = server.locateQueue(PagingTest.ADDRESS); + Assert.assertEquals(numberOfMessages, queue.getMessageCount()); + + store.forceAnotherPage(); + + session.start(); + + ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS); + + for (int i = 0; i < numberOfMessages; i++) { + message = consumer.receive(5000); + assertNotNull(message); + message.acknowledge(); + } + + session.commit(); + + assertNull(consumer.receiveImmediate()); + + consumer.close(); + + store.getCursorProvider().cleanup(); + + Assert.assertEquals(0, queue.getMessageCount()); + + long timeout = System.currentTimeMillis() + 5000; + while (store.isPaging() && timeout > System.currentTimeMillis()) { + Thread.sleep(100); + } + + store.getCursorProvider().cleanup(); + + sf.close(); + + locator.close(); + + Assert.assertEquals(1, store.getNumberOfPages()); + + } finally { + try { + server.stop(); + } catch (Throwable ignored) { + } + } + } + public void internalTestMultiFilters(boolean browsing) throws Throwable { clearDataRecreateServerDirs();