Repository: activemq Updated Branches: refs/heads/master bfbdd3c5a -> b07821ab6
[AMQ-6625] remove kahadbioexceptionhandler by pushing allowIOResumption into persistence adapter. This allows the lease locker to still be used with kahadb for stopStartConnectors support Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b07821ab Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b07821ab Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b07821ab Branch: refs/heads/master Commit: b07821ab6494b43e1ae8877eaf740effc7896b84 Parents: bfbdd3c Author: gtully <gary.tu...@gmail.com> Authored: Thu Jun 15 17:27:47 2017 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Thu Jun 15 17:28:31 2017 +0100 ---------------------------------------------------------------------- .../activemq/store/PersistenceAdapter.java | 2 + .../store/memory/MemoryPersistenceAdapter.java | 3 ++ .../util/DefaultIOExceptionHandler.java | 5 +++ .../store/jdbc/JDBCPersistenceAdapter.java | 3 ++ .../journal/JournalPersistenceAdapter.java | 5 +++ .../store/kahadb/KahaDBIOExceptionHandler.java | 43 -------------------- .../store/kahadb/KahaDBPersistenceAdapter.java | 5 +++ .../kahadb/MultiKahaDBPersistenceAdapter.java | 7 ++++ .../activemq/store/kahadb/TempKahaDBStore.java | 7 ++++ .../apache/activemq/leveldb/LevelDBStore.scala | 2 + .../leveldb/replicated/ProxyLevelDBStore.scala | 2 + .../RedeliveryRestartWithExceptionTest.java | 7 +++- 12 files changed, 47 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java index 01a9634..07063b4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java @@ -206,4 +206,6 @@ public interface PersistenceAdapter extends Service { * @return the last stored sequence id or -1 if no suppression needed */ long getLastProducerSequenceId(ProducerId id) throws IOException; + + void allowIOResumption(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java index 5c073c3..a1233e0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java @@ -238,6 +238,9 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter, NoLocalSubs } @Override + public void allowIOResumption() {} + + @Override public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { // We could eventuall implement an in memory scheduler. throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java index 0ee6743..7668364 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java +++ b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java @@ -166,6 +166,11 @@ import org.slf4j.LoggerFactory; } protected void allowIOResumption() { + try { + broker.getPersistenceAdapter().allowIOResumption(); + } catch (IOException e) { + LOG.warn("Failed to allow IO resumption", e); + } } private void stopBroker(Exception exception) { http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index 5da7592..a6ad870 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -292,6 +292,9 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements } @Override + public void allowIOResumption() {} + + @Override public void init() throws Exception { getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index 787b277..10b5c7a 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -800,6 +800,11 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } @Override + public void allowIOResumption() { + longTermPersistence.allowIOResumption(); + } + + @Override public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { return longTermPersistence.createJobSchedulerStore(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBIOExceptionHandler.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBIOExceptionHandler.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBIOExceptionHandler.java deleted file mode 100644 index 9ddc6d2..0000000 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBIOExceptionHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import org.apache.activemq.util.DefaultIOExceptionHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * @org.apache.xbean.XBean - */ -public class KahaDBIOExceptionHandler extends DefaultIOExceptionHandler { - - private static final Logger LOG = LoggerFactory - .getLogger(KahaDBIOExceptionHandler.class); - - protected void allowIOResumption() { - try { - if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { - KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - kahaDBPersistenceAdapter.getStore().allowIOResumption(); - } - } catch (IOException e) { - LOG.warn("Failed to allow IO resumption", e); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 297f844..c4f480c 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -163,6 +163,11 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements return this.letter.getLastProducerSequenceId(id); } + @Override + public void allowIOResumption() { + this.letter.allowIOResumption(); + } + /** * @param destination * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java index 4fa6b3d..4a30d0a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java @@ -289,6 +289,13 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem } @Override + public void allowIOResumption() { + for (PersistenceAdapter persistenceAdapter : adapters) { + persistenceAdapter.allowIOResumption(); + } + } + + @Override public void removeQueueMessageStore(ActiveMQQueue destination) { PersistenceAdapter adapter = null; try { http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java index 9686913..7048b09 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java @@ -648,6 +648,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } @Override + public void allowIOResumption() { + if (pageFile != null) { + pageFile.allowIOResumption(); + } + } + + @Override public void setBrokerService(BrokerService brokerService) { this.brokerService = brokerService; } http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index cb28173..593ec9e 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -1147,4 +1147,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def rollbackTransaction(context: ConnectionContext): Unit = {} def createClient = new LevelDBClient(this); + + def allowIOResumption() = {} } http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala index 7b62a95..9822fe2 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ProxyLevelDBStore.scala @@ -132,4 +132,6 @@ abstract class ProxyLevelDBStore extends LockableServiceSupport with BrokerServi def removePList(name: String): Boolean = { return proxy_target.removePList(name) } + + def allowIOResumption() = {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/b07821ab/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java index 2d840ab..b032e25 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java @@ -416,7 +416,12 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport { public long getLastProducerSequenceId(ProducerId id) throws IOException { return kahaDB.getLastProducerSequenceId(id); } - + + @Override + public void allowIOResumption() { + kahaDB.allowIOResumption(); + } + } private class ProxyMessageStoreWithUpdateException extends ProxyMessageStore {