ARTEMIS-1333 SendACK listener message loss (adding test)

next commit should have the fix.
this is to make it easy to confirm the fix by people looking.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/96c6268f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/96c6268f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/96c6268f

Branch: refs/heads/master
Commit: 96c6268f5a68a293589ac0061d561265d9e79972
Parents: 8bc15b1
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Tue Aug 8 22:39:20 2017 -0400
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Wed Aug 9 15:18:54 2017 -0400

----------------------------------------------------------------------
 .../core/server/impl/ActiveMQServerImpl.java    |   5 +-
 .../amqp/JMSMessageConsumerTest.java            |  11 +-
 .../integration/client/SendAckFailTest.java     | 775 +++++++++++++++++++
 3 files changed, 782 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96c6268f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 8d9ce99..ce1f354 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import javax.management.MBeanServer;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -47,8 +48,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-import javax.management.MBeanServer;
-
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -2098,7 +2097,7 @@ public class ActiveMQServerImpl implements ActiveMQServer 
{
    /**
     * This method is protected as it may be used as a hook for creating a 
custom storage manager (on tests for instance)
     */
-   private StorageManager createStorageManager() {
+   protected StorageManager createStorageManager() {
       if (configuration.isPersistenceEnabled()) {
          if (configuration.getStoreConfiguration() != null && 
configuration.getStoreConfiguration().getStoreType() == 
StoreConfiguration.StoreType.DATABASE) {
             JDBCJournalStorageManager journal = new 
JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), 
getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96c6268f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
index c5372ac..68a9801 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java
@@ -16,12 +16,6 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -34,6 +28,11 @@ import javax.jms.MessageProducer;
 import javax.jms.QueueBrowser;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.tests.util.Wait;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/96c6268f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
new file mode 100644
index 0000000..292b5c4
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
+import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
+import org.apache.activemq.artemis.core.paging.PagedMessage;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.persistence.AddressBindingInfo;
+import org.apache.activemq.artemis.core.persistence.GroupingInfo;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
+import org.apache.activemq.artemis.core.persistence.QueueStatus;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import 
org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
+import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
+import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.replication.ReplicationManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.RouteContextList;
+import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
+import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.JournalLoader;
+import org.apache.activemq.artemis.core.transaction.ResourceManager;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SendAckFailTest extends ActiveMQTestBase {
+
+   @Before
+   @After
+   public void deleteDirectory() throws Exception {
+      deleteDirectory(new File("./target/send-ack"));
+   }
+
+   @Override
+   public String getJournalDir(final int index, final boolean backup) {
+      new Exception("trace").printStackTrace(System.out);
+      return "./target/send-ack/journal";
+   }
+
+   @Override
+   protected String getBindingsDir(final int index, final boolean backup) {
+      return "./target/send-ack/binding";
+   }
+
+   @Override
+   protected String getPageDir(final int index, final boolean backup) {
+      return "./target/send-ack/page";
+   }
+
+   @Override
+   protected String getLargeMessagesDir(final int index, final boolean backup) 
{
+      return "./target/send-ack/large-message";
+   }
+
+   @Test
+   public void testSend() throws Exception {
+      Process process = 
SpawnedVMSupport.spawnVM(SendAckFailTest.class.getName());
+      ActiveMQServer server = null;
+
+      try {
+
+         HashSet<Integer> listSent = new HashSet<>();
+
+         Thread t = null;
+         {
+            ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory();
+            ServerLocator locator = factory.getServerLocator();
+
+            
locator.setConfirmationWindowSize(0).setInitialConnectAttempts(100).setRetryInterval(100).setBlockOnDurableSend(false).setReconnectAttempts(0);
+
+            ClientSessionFactory sf = locator.createSessionFactory();
+
+            ClientSession session = sf.createSession();
+            session.createAddress(SimpleString.toSimpleString("T1"), 
RoutingType.ANYCAST, true);
+            session.createQueue(SimpleString.toSimpleString("T1"), 
RoutingType.ANYCAST, SimpleString.toSimpleString("T1"), true);
+
+            ClientProducer producer = session.createProducer("T1");
+
+            session.setSendAcknowledgementHandler(new 
SendAcknowledgementHandler() {
+               @Override
+               public void sendAcknowledged(Message message) {
+                  listSent.add(message.getIntProperty("myid"));
+               }
+            });
+
+            t = new Thread() {
+               @Override
+               public void run() {
+                  for (int i = 0; i < 5000; i++) {
+                     try {
+                        
producer.send(session.createMessage(true).putIntProperty("myid", i));
+                     } catch (Exception e) {
+                        e.printStackTrace();
+                        break;
+                     }
+                  }
+               }
+            };
+            t.start();
+         }
+
+         Wait.waitFor(() -> listSent.size() > 100, 5000, 10);
+
+         Assert.assertTrue(process.waitFor(1, TimeUnit.MINUTES));
+
+         server = startServer(false);
+
+         {
+            ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory();
+            ServerLocator locator = factory.getServerLocator();
+
+            ClientSessionFactory sf = locator.createSessionFactory();
+
+            ClientSession session = sf.createSession();
+
+            ClientConsumer consumer = session.createConsumer("T1");
+
+            session.start();
+
+            for (int i = 0; i < listSent.size(); i++) {
+               ClientMessage message = consumer.receive(1000);
+               if (message == null) {
+                  for (Integer msgi : listSent) {
+                     System.out.println("Message " + msgi + " was lost");
+                  }
+                  fail("missed messages!");
+               }
+               message.acknowledge();
+
+               if (!listSent.remove(message.getIntProperty("myid"))) {
+                  System.out.println("Message " + message + " with id " + 
message.getIntProperty("myid") + " received in duplicate");
+                  fail("Message " + message + " with id " + 
message.getIntProperty("myid") + " received in duplicate");
+               }
+            }
+         }
+
+      } finally {
+         if (process != null) {
+            process.destroy();
+         }
+         if (server != null) {
+            server.stop();
+         }
+      }
+   }
+
+   public static void main(String[] arg) {
+      SendAckFailTest test = new SendAckFailTest();
+      test.startServer(true);
+   }
+
+   public ActiveMQServer startServer(boolean fail) {
+      try {
+         //ActiveMQServerImpl server = (ActiveMQServerImpl) createServer(true, 
true);
+
+         AtomicInteger count = new AtomicInteger(0);
+
+         ActiveMQSecurityManager securityManager = new 
ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new 
SecurityConfiguration());
+
+         Configuration configuration = createDefaultConfig(true);
+
+         ActiveMQServer server = new ActiveMQServerImpl(configuration, 
ManagementFactory.getPlatformMBeanServer(), securityManager) {
+            @Override
+            public StorageManager createStorageManager() {
+               StorageManager original = super.createStorageManager();
+
+               return new StorageManagerDelegate(original) {
+                  @Override
+                  public void storeMessage(Message message) throws Exception {
+
+                     if (fail) {
+                        if (count.incrementAndGet() == 110) {
+                           System.out.println("Failing " + message);
+                           System.out.flush();
+                           Thread.sleep(100);
+                           Runtime.getRuntime().halt(-1);
+                        }
+                     }
+                     super.storeMessage(message);
+
+                  }
+               };
+
+            }
+         };
+
+
+
+         System.out.println("Location::" + 
server.getConfiguration().getJournalLocation().getAbsolutePath());
+         server.start();
+         return server;
+      } catch (Exception e) {
+         e.printStackTrace();
+         return null;
+      }
+   }
+
+
+   private class StorageManagerDelegate implements StorageManager {
+
+      @Override
+      public void start() throws Exception {
+         manager.start();
+      }
+
+      @Override
+      public void stop() throws Exception {
+         manager.stop();
+      }
+
+      @Override
+      public boolean isStarted() {
+         return manager.isStarted();
+      }
+
+      @Override
+      public long generateID() {
+         return manager.generateID();
+      }
+
+      @Override
+      public long getCurrentID() {
+         return manager.getCurrentID();
+      }
+
+      @Override
+      public void criticalError(Throwable error) {
+         manager.criticalError(error);
+      }
+
+      @Override
+      public OperationContext getContext() {
+         return manager.getContext();
+      }
+
+      @Override
+      public void lineUpContext() {
+         manager.lineUpContext();
+      }
+
+      @Override
+      public OperationContext newContext(Executor executor) {
+         return manager.newContext(executor);
+      }
+
+      @Override
+      public OperationContext newSingleThreadContext() {
+         return manager.newSingleThreadContext();
+      }
+
+      @Override
+      public void setContext(OperationContext context) {
+         manager.setContext(context);
+      }
+
+      @Override
+      public void stop(boolean ioCriticalError, boolean sendFailover) throws 
Exception {
+         manager.stop(ioCriticalError, sendFailover);
+      }
+
+      @Override
+      public void pageClosed(SimpleString storeName, int pageNumber) {
+         manager.pageClosed(storeName, pageNumber);
+      }
+
+      @Override
+      public void pageDeleted(SimpleString storeName, int pageNumber) {
+         manager.pageDeleted(storeName, pageNumber);
+      }
+
+      @Override
+      public void pageWrite(PagedMessage message, int pageNumber) {
+         manager.pageWrite(message, pageNumber);
+      }
+
+      @Override
+      public void afterCompleteOperations(IOCallback run) {
+         manager.afterCompleteOperations(run);
+      }
+
+      @Override
+      public void afterStoreOperations(IOCallback run) {
+         manager.afterStoreOperations(run);
+      }
+
+      @Override
+      public boolean waitOnOperations(long timeout) throws Exception {
+         return manager.waitOnOperations(timeout);
+      }
+
+      @Override
+      public void waitOnOperations() throws Exception {
+         manager.waitOnOperations();
+      }
+
+      @Override
+      public void beforePageRead() throws Exception {
+         manager.beforePageRead();
+      }
+
+      @Override
+      public void afterPageRead() throws Exception {
+         manager.afterPageRead();
+      }
+
+      @Override
+      public ByteBuffer allocateDirectBuffer(int size) {
+         return manager.allocateDirectBuffer(size);
+      }
+
+      @Override
+      public void freeDirectBuffer(ByteBuffer buffer) {
+         manager.freeDirectBuffer(buffer);
+      }
+
+      @Override
+      public void clearContext() {
+         manager.clearContext();
+      }
+
+      @Override
+      public void confirmPendingLargeMessageTX(Transaction transaction,
+                                               long messageID,
+                                               long recordID) throws Exception 
{
+         manager.confirmPendingLargeMessageTX(transaction, messageID, 
recordID);
+      }
+
+      @Override
+      public void confirmPendingLargeMessage(long recordID) throws Exception {
+         manager.confirmPendingLargeMessage(recordID);
+      }
+
+      @Override
+      public void storeMessage(Message message) throws Exception {
+         manager.storeMessage(message);
+      }
+
+      @Override
+      public void storeReference(long queueID, long messageID, boolean last) 
throws Exception {
+         manager.storeReference(queueID, messageID, last);
+      }
+
+      @Override
+      public void deleteMessage(long messageID) throws Exception {
+         manager.deleteMessage(messageID);
+      }
+
+      @Override
+      public void storeAcknowledge(long queueID, long messageID) throws 
Exception {
+         manager.storeAcknowledge(queueID, messageID);
+      }
+
+      @Override
+      public void storeCursorAcknowledge(long queueID, PagePosition position) 
throws Exception {
+         manager.storeCursorAcknowledge(queueID, position);
+      }
+
+      @Override
+      public void updateDeliveryCount(MessageReference ref) throws Exception {
+         manager.updateDeliveryCount(ref);
+      }
+
+      @Override
+      public void updateScheduledDeliveryTime(MessageReference ref) throws 
Exception {
+         manager.updateScheduledDeliveryTime(ref);
+      }
+
+      @Override
+      public void storeDuplicateID(SimpleString address, byte[] duplID, long 
recordID) throws Exception {
+         manager.storeDuplicateID(address, duplID, recordID);
+      }
+
+      @Override
+      public void deleteDuplicateID(long recordID) throws Exception {
+         manager.deleteDuplicateID(recordID);
+      }
+
+      @Override
+      public void storeMessageTransactional(long txID, Message message) throws 
Exception {
+         manager.storeMessageTransactional(txID, message);
+      }
+
+      @Override
+      public void storeReferenceTransactional(long txID, long queueID, long 
messageID) throws Exception {
+         manager.storeReferenceTransactional(txID, queueID, messageID);
+      }
+
+      @Override
+      public void storeAcknowledgeTransactional(long txID, long queueID, long 
messageID) throws Exception {
+         manager.storeAcknowledgeTransactional(txID, queueID, messageID);
+      }
+
+      @Override
+      public void storeCursorAcknowledgeTransactional(long txID, long queueID, 
PagePosition position) throws Exception {
+         manager.storeCursorAcknowledgeTransactional(txID, queueID, position);
+      }
+
+      @Override
+      public void deleteCursorAcknowledgeTransactional(long txID, long ackID) 
throws Exception {
+         manager.deleteCursorAcknowledgeTransactional(txID, ackID);
+      }
+
+      @Override
+      public void deleteCursorAcknowledge(long ackID) throws Exception {
+         manager.deleteCursorAcknowledge(ackID);
+      }
+
+      @Override
+      public void storePageCompleteTransactional(long txID, long queueID, 
PagePosition position) throws Exception {
+         manager.storePageCompleteTransactional(txID, queueID, position);
+      }
+
+      @Override
+      public void deletePageComplete(long ackID) throws Exception {
+         manager.deletePageComplete(ackID);
+      }
+
+      @Override
+      public void updateScheduledDeliveryTimeTransactional(long txID, 
MessageReference ref) throws Exception {
+         manager.updateScheduledDeliveryTimeTransactional(txID, ref);
+      }
+
+      @Override
+      public void storeDuplicateIDTransactional(long txID,
+                                                SimpleString address,
+                                                byte[] duplID,
+                                                long recordID) throws 
Exception {
+         manager.storeDuplicateIDTransactional(txID, address, duplID, 
recordID);
+      }
+
+      @Override
+      public void updateDuplicateIDTransactional(long txID,
+                                                 SimpleString address,
+                                                 byte[] duplID,
+                                                 long recordID) throws 
Exception {
+         manager.updateDuplicateIDTransactional(txID, address, duplID, 
recordID);
+      }
+
+      @Override
+      public void deleteDuplicateIDTransactional(long txID, long recordID) 
throws Exception {
+         manager.deleteDuplicateIDTransactional(txID, recordID);
+      }
+
+      @Override
+      public LargeServerMessage createLargeMessage() {
+         return manager.createLargeMessage();
+      }
+
+      @Override
+      public LargeServerMessage createLargeMessage(long id, Message message) 
throws Exception {
+         return manager.createLargeMessage(id, message);
+      }
+
+      @Override
+      public SequentialFile createFileForLargeMessage(long messageID, 
LargeMessageExtension extension) {
+         return manager.createFileForLargeMessage(messageID, extension);
+      }
+
+      @Override
+      public void prepare(long txID, Xid xid) throws Exception {
+         manager.prepare(txID, xid);
+      }
+
+      @Override
+      public void commit(long txID) throws Exception {
+         manager.commit(txID);
+      }
+
+      @Override
+      public void commit(long txID, boolean lineUpContext) throws Exception {
+         manager.commit(txID, lineUpContext);
+      }
+
+      @Override
+      public void rollback(long txID) throws Exception {
+         manager.rollback(txID);
+      }
+
+      @Override
+      public void rollbackBindings(long txID) throws Exception {
+         manager.rollbackBindings(txID);
+      }
+
+      @Override
+      public void commitBindings(long txID) throws Exception {
+         manager.commitBindings(txID);
+      }
+
+      @Override
+      public void storePageTransaction(long txID, PageTransactionInfo 
pageTransaction) throws Exception {
+         manager.storePageTransaction(txID, pageTransaction);
+      }
+
+      @Override
+      public void updatePageTransaction(long txID, PageTransactionInfo 
pageTransaction, int depage) throws Exception {
+         manager.updatePageTransaction(txID, pageTransaction, depage);
+      }
+
+      @Override
+      public void deletePageTransactional(long recordID) throws Exception {
+         manager.deletePageTransactional(recordID);
+      }
+
+      @Override
+      public JournalLoadInformation loadMessageJournal(PostOffice postOffice,
+                                                       PagingManager 
pagingManager,
+                                                       ResourceManager 
resourceManager,
+                                                       Map<Long, 
QueueBindingInfo> queueInfos,
+                                                       Map<SimpleString, 
List<Pair<byte[], Long>>> duplicateIDMap,
+                                                       Set<Pair<Long, Long>> 
pendingLargeMessages,
+                                                       List<PageCountPending> 
pendingNonTXPageCounter,
+                                                       JournalLoader 
journalLoader) throws Exception {
+         return manager.loadMessageJournal(postOffice, pagingManager, 
resourceManager, queueInfos, duplicateIDMap, pendingLargeMessages, 
pendingNonTXPageCounter, journalLoader);
+      }
+
+      @Override
+      public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws 
Exception {
+         return manager.storeHeuristicCompletion(xid, isCommit);
+      }
+
+      @Override
+      public void deleteHeuristicCompletion(long id) throws Exception {
+         manager.deleteHeuristicCompletion(id);
+      }
+
+      @Override
+      public void addQueueBinding(long tx, Binding binding) throws Exception {
+         manager.addQueueBinding(tx, binding);
+      }
+
+      @Override
+      public void deleteQueueBinding(long tx, long queueBindingID) throws 
Exception {
+         manager.deleteQueueBinding(tx, queueBindingID);
+      }
+
+      @Override
+      public long storeQueueStatus(long queueID, QueueStatus status) throws 
Exception {
+         return manager.storeQueueStatus(queueID, status);
+      }
+
+      @Override
+      public void deleteQueueStatus(long recordID) throws Exception {
+         manager.deleteQueueStatus(recordID);
+      }
+
+      @Override
+      public void addAddressBinding(long tx, AddressInfo addressInfo) throws 
Exception {
+         manager.addAddressBinding(tx, addressInfo);
+      }
+
+      @Override
+      public void deleteAddressBinding(long tx, long addressBindingID) throws 
Exception {
+         manager.deleteAddressBinding(tx, addressBindingID);
+      }
+
+      @Override
+      public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> 
queueBindingInfos,
+                                                       List<GroupingInfo> 
groupingInfos,
+                                                       
List<AddressBindingInfo> addressBindingInfos) throws Exception {
+         return manager.loadBindingJournal(queueBindingInfos, groupingInfos, 
addressBindingInfos);
+      }
+
+      @Override
+      public void addGrouping(GroupBinding groupBinding) throws Exception {
+         manager.addGrouping(groupBinding);
+      }
+
+      @Override
+      public void deleteGrouping(long tx, GroupBinding groupBinding) throws 
Exception {
+         manager.deleteGrouping(tx, groupBinding);
+      }
+
+      @Override
+      public void storeAddressSetting(PersistedAddressSetting addressSetting) 
throws Exception {
+         manager.storeAddressSetting(addressSetting);
+      }
+
+      @Override
+      public void deleteAddressSetting(SimpleString addressMatch) throws 
Exception {
+         manager.deleteAddressSetting(addressMatch);
+      }
+
+      @Override
+      public List<PersistedAddressSetting> recoverAddressSettings() throws 
Exception {
+         return manager.recoverAddressSettings();
+      }
+
+      @Override
+      public void storeSecurityRoles(PersistedRoles persistedRoles) throws 
Exception {
+         manager.storeSecurityRoles(persistedRoles);
+      }
+
+      @Override
+      public void deleteSecurityRoles(SimpleString addressMatch) throws 
Exception {
+         manager.deleteSecurityRoles(addressMatch);
+      }
+
+      @Override
+      public List<PersistedRoles> recoverPersistedRoles() throws Exception {
+         return manager.recoverPersistedRoles();
+      }
+
+      @Override
+      public long storePageCounter(long txID, long queueID, long value) throws 
Exception {
+         return manager.storePageCounter(txID, queueID, value);
+      }
+
+      @Override
+      public long storePendingCounter(long queueID, long pageID, int inc) 
throws Exception {
+         return manager.storePendingCounter(queueID, pageID, inc);
+      }
+
+      @Override
+      public void deleteIncrementRecord(long txID, long recordID) throws 
Exception {
+         manager.deleteIncrementRecord(txID, recordID);
+      }
+
+      @Override
+      public void deletePageCounter(long txID, long recordID) throws Exception 
{
+         manager.deletePageCounter(txID, recordID);
+      }
+
+      @Override
+      public void deletePendingPageCounter(long txID, long recordID) throws 
Exception {
+         manager.deletePendingPageCounter(txID, recordID);
+      }
+
+      @Override
+      public long storePageCounterInc(long txID, long queueID, int add) throws 
Exception {
+         return manager.storePageCounterInc(txID, queueID, add);
+      }
+
+      @Override
+      public long storePageCounterInc(long queueID, int add) throws Exception {
+         return manager.storePageCounterInc(queueID, add);
+      }
+
+      @Override
+      public Journal getBindingsJournal() {
+         return manager.getBindingsJournal();
+      }
+
+      @Override
+      public Journal getMessageJournal() {
+         return manager.getMessageJournal();
+      }
+
+      @Override
+      public void startReplication(ReplicationManager replicationManager,
+                                   PagingManager pagingManager,
+                                   String nodeID,
+                                   boolean autoFailBack,
+                                   long initialReplicationSyncTimeout) throws 
Exception {
+         manager.startReplication(replicationManager, pagingManager, nodeID, 
autoFailBack, initialReplicationSyncTimeout);
+      }
+
+      @Override
+      public boolean addToPage(PagingStore store,
+                               Message msg,
+                               Transaction tx,
+                               RouteContextList listCtx) throws Exception {
+         return manager.addToPage(store, msg, tx, listCtx);
+      }
+
+      @Override
+      public void stopReplication() {
+         manager.stopReplication();
+      }
+
+      @Override
+      public void addBytesToLargeMessage(SequentialFile appendFile, long 
messageID, byte[] bytes) throws Exception {
+         manager.addBytesToLargeMessage(appendFile, messageID, bytes);
+      }
+
+      @Override
+      public void storeID(long journalID, long id) throws Exception {
+         manager.storeID(journalID, id);
+      }
+
+      @Override
+      public void deleteID(long journalD) throws Exception {
+         manager.deleteID(journalD);
+      }
+
+      @Override
+      public void readLock() {
+         manager.readLock();
+      }
+
+      @Override
+      public void readUnLock() {
+         manager.readUnLock();
+      }
+
+      @Override
+      public void persistIdGenerator() {
+         manager.persistIdGenerator();
+      }
+
+      @Override
+      public void injectMonitor(FileStoreMonitor monitor) throws Exception {
+         manager.injectMonitor(monitor);
+      }
+
+      private final StorageManager manager;
+
+      StorageManagerDelegate(StorageManager manager) {
+         this.manager = manager;
+      }
+   }
+
+}
\ No newline at end of file

Reply via email to