ARTEMIS-1489 Adding Timed Buffer into Critical Analyzer
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2bf690e2 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2bf690e2 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2bf690e2 Branch: refs/heads/master Commit: 2bf690e21bdd23aaf7bdd42661be9700c3284688 Parents: 5a476a1 Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Mon Oct 30 12:10:59 2017 -0400 Committer: Justin Bertram <jbert...@apache.org> Committed: Tue Oct 31 08:33:44 2017 -0500 ---------------------------------------------------------------------- .../utils/critical/CriticalAnalyzerImpl.java | 2 + .../utils/critical/CriticalComponentImpl.java | 4 ++ .../core/io/AbstractSequentialFileFactory.java | 12 +++- .../core/io/aio/AIOSequentialFileFactory.java | 12 ++-- .../artemis/core/io/buffer/TimedBuffer.java | 58 ++++++++++++-------- .../io/mapped/MappedSequentialFileFactory.java | 4 +- .../core/io/nio/NIOSequentialFileFactory.java | 12 ++-- .../artemis/core/io/JournalTptBenchmark.java | 4 +- .../core/io/SequentialFileTptBenchmark.java | 4 +- .../impl/journal/JournalStorageManager.java | 4 +- .../unit/core/journal/impl/TimedBufferTest.java | 10 ++-- 11 files changed, 78 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java index 1c2c0eb..6a9a0dd 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java @@ -147,6 +147,8 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer { logger.warn(e.getMessage(), e); } } + + actions.clear(); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java index c1c5602..07d5a3f 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java @@ -25,6 +25,10 @@ public class CriticalComponentImpl implements CriticalComponent { private final CriticalMeasure[] measures; private final CriticalAnalyzer analyzer; + public CriticalAnalyzer getCriticalAnalyzer() { + return analyzer; + } + public CriticalComponentImpl(CriticalAnalyzer analyzer, int numberOfPaths) { if (analyzer == null) { analyzer = EmptyCriticalAnalyzer.getInstance(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java index 0507373..b9ea6a8 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java @@ -33,6 +33,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.core.io.buffer.TimedBuffer; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; +import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; import org.jboss.logging.Logger; /** @@ -74,11 +76,17 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac final int bufferTimeout, final int maxIO, final boolean logRates, - final IOCriticalErrorListener criticalErrorListener) { + final IOCriticalErrorListener criticalErrorListener, + CriticalAnalyzer criticalAnalyzer) { this.journalDir = journalDir; + if (criticalAnalyzer == null) { + criticalAnalyzer = EmptyCriticalAnalyzer.getInstance(); + } + if (buffered && bufferTimeout > 0) { - timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, logRates); + timedBuffer = new TimedBuffer(criticalAnalyzer, bufferSize, bufferTimeout, logRates); + criticalAnalyzer.add(timedBuffer); } else { timedBuffer = null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index e8cc97e..d8288e6 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioFile; import org.apache.activemq.artemis.jlibaio.SubmitInfo; import org.apache.activemq.artemis.jlibaio.util.CallbackCache; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.jboss.logging.Logger; public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory { @@ -56,11 +57,11 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor private static final String AIO_TEST_FILE = ".aio-test"; public AIOSequentialFileFactory(final File journalDir, int maxIO) { - this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, null); + this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, null, null); } public AIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, int maxIO) { - this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, listener); + this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, listener, null); } public AIOSequentialFileFactory(final File journalDir, @@ -68,7 +69,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor final int bufferTimeout, final int maxIO, final boolean logRates) { - this(journalDir, bufferSize, bufferTimeout, maxIO, logRates, null); + this(journalDir, bufferSize, bufferTimeout, maxIO, logRates, null, null); } public AIOSequentialFileFactory(final File journalDir, @@ -76,8 +77,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor final int bufferTimeout, final int maxIO, final boolean logRates, - final IOCriticalErrorListener listener) { - super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener); + final IOCriticalErrorListener listener, + final CriticalAnalyzer analyzer) { + super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer); callbackPool = new CallbackCache<>(maxIO); if (logger.isTraceEnabled()) { logger.trace("New AIO File Created"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java index f469bf8..e0fe149 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java @@ -32,9 +32,14 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; +import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl; import org.jboss.logging.Logger; -public final class TimedBuffer { +public final class TimedBuffer extends CriticalComponentImpl { + + protected static final int CRITICAL_PATHS = 1; + protected static final int CRITICAL_PATH_FLUSH = 0; private static final Logger logger = Logger.getLogger(TimedBuffer.class); @@ -99,7 +104,8 @@ public final class TimedBuffer { // Public -------------------------------------------------------- - public TimedBuffer(final int size, final int timeout, final boolean logRates) { + public TimedBuffer(CriticalAnalyzer analyzer, final int size, final int timeout, final boolean logRates) { + super(analyzer, CRITICAL_PATHS); bufferSize = size; this.logRates = logRates; @@ -286,38 +292,42 @@ public final class TimedBuffer { throw new IllegalStateException("TimedBuffer is not started"); } - if (!delayFlush && buffer.writerIndex() > 0) { - int pos = buffer.writerIndex(); - - if (logRates) { - bytesFlushed.addAndGet(pos); - } + enterCritical(CRITICAL_PATH_FLUSH); + try { + if (!delayFlush && buffer.writerIndex() > 0) { + int pos = buffer.writerIndex(); - final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos); - //bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!! - bufferToFlush.limit(pos); - //perform memcpy under the hood due to the off heap buffer - buffer.getBytes(0, bufferToFlush); + if (logRates) { + bytesFlushed.addAndGet(pos); + } + final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos); + //bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!! + bufferToFlush.limit(pos); + //perform memcpy under the hood due to the off heap buffer + buffer.getBytes(0, bufferToFlush); - bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks); + bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks); - stopSpin(); + stopSpin(); - pendingSync = false; + pendingSync = false; - // swap the instance as the previous callback list is being used asynchronously - callbacks = new ArrayList<>(); + // swap the instance as the previous callback list is being used asynchronously + callbacks = new ArrayList<>(); - buffer.clear(); + buffer.clear(); - bufferLimit = 0; + bufferLimit = 0; - flushesDone.incrementAndGet(); + flushesDone.incrementAndGet(); - return pos > 0; - } else { - return false; + return pos > 0; + } else { + return false; + } + } finally { + leaveCritical(CRITICAL_PATH_FLUSH); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java index 2be2ff2..2cdaba1 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java @@ -40,7 +40,9 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac final int bufferTimeout, IOCriticalErrorListener criticalErrorListener) { - super(directory, buffered, bufferSize, bufferTimeout, 1, false, criticalErrorListener); + // at the moment we only use the critical analyzer on the timed buffer + // MappedSequentialFile is not using any buffering, hence we just pass in null + super(directory, buffered, bufferSize, bufferTimeout, 1, false, criticalErrorListener, null); this.capacity = capacity; this.setDatasync(true); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java index 781176e..b585b24 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.utils.Env; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; public final class NIOSequentialFileFactory extends AbstractSequentialFileFactory { @@ -42,7 +43,7 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor } public NIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener, final int maxIO) { - this(journalDir, false, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener); + this(journalDir, false, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener, null); } public NIOSequentialFileFactory(final File journalDir, final boolean buffered, final int maxIO) { @@ -53,7 +54,7 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor final boolean buffered, final IOCriticalErrorListener listener, final int maxIO) { - this(journalDir, buffered, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener); + this(journalDir, buffered, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener, null); } public NIOSequentialFileFactory(final File journalDir, @@ -62,7 +63,7 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor final int bufferTimeout, final int maxIO, final boolean logRates) { - this(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, null); + this(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, null, null); } public NIOSequentialFileFactory(final File journalDir, @@ -71,8 +72,9 @@ public final class NIOSequentialFileFactory extends AbstractSequentialFileFactor final int bufferTimeout, final int maxIO, final boolean logRates, - final IOCriticalErrorListener listener) { - super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener); + final IOCriticalErrorListener listener, + final CriticalAnalyzer analyzer) { + super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer); this.bufferPooling = true; this.bytesPool = new ThreadLocal<>(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java index b0096b7..4562585 100644 --- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java @@ -66,10 +66,10 @@ public class JournalTptBenchmark { .setDatasync(dataSync); break; case Nio: - factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync); + factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null, null).setDatasync(dataSync); break; case Aio: - factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync); + factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null, null).setDatasync(dataSync); //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse(); if (!LibaioContext.isLoaded()) { throw new IllegalStateException("lib AIO not loaded!"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java index ed14ae4..8ba0ccc 100644 --- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java @@ -59,10 +59,10 @@ public class SequentialFileTptBenchmark { factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync); break; case Nio: - factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync); + factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null, null).setDatasync(dataSync); break; case Aio: - factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync); + factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null, null).setDatasync(dataSync); //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse(); if (!LibaioContext.isLoaded()) { throw new IllegalStateException("lib AIO not loaded!"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 84adde4..160d12d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -133,11 +133,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager { case NIO: ActiveMQServerLogger.LOGGER.journalUseNIO(); - journalFF = new NIOSequentialFileFactory(config.getJournalLocation(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener); + journalFF = new NIOSequentialFileFactory(config.getJournalLocation(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener, getCriticalAnalyzer()); break; case ASYNCIO: ActiveMQServerLogger.LOGGER.journalUseAIO(); - journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener); + journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener, getCriticalAnalyzer()); break; case MAPPED: ActiveMQServerLogger.LOGGER.journalUseMAPPED(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java index 2462ee7..3619db2 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java @@ -87,7 +87,7 @@ public class TimedBufferTest extends ActiveMQTestBase { } } - TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS, false); + TimedBuffer timedBuffer = new TimedBuffer(null, 100, TimedBufferTest.ONE_SECOND_IN_NANOS, false); timedBuffer.start(); @@ -155,7 +155,7 @@ public class TimedBufferTest extends ActiveMQTestBase { } } - TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 2, false); + TimedBuffer timedBuffer = new TimedBuffer(null, 100, TimedBufferTest.ONE_SECOND_IN_NANOS / 2, false); timedBuffer.start(); @@ -393,7 +393,7 @@ public class TimedBufferTest extends ActiveMQTestBase { //it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match final long deviceTime = timeout; final int bufferSize = Env.osPageSize(); - final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) timeout, false); + final TimedBuffer timedBuffer = new TimedBuffer(null, bufferSize, (int) timeout, false); timedBuffer.start(); try (NonBlockingObserver observer = new NonBlockingObserver(bufferSize, deviceTime)) { timedBuffer.setObserver(observer); @@ -434,7 +434,7 @@ public class TimedBufferTest extends ActiveMQTestBase { //it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match final long deviceTime = timeout; final int bufferSize = Env.osPageSize(); - final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) timeout, false); + final TimedBuffer timedBuffer = new TimedBuffer(null, bufferSize, (int) timeout, false); timedBuffer.start(); try (BlockingObserver observer = new BlockingObserver(bufferSize, deviceTime)) { timedBuffer.setObserver(observer); @@ -489,7 +489,7 @@ public class TimedBufferTest extends ActiveMQTestBase { } } - TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 10, false); + TimedBuffer timedBuffer = new TimedBuffer(null, 100, TimedBufferTest.ONE_SECOND_IN_NANOS / 10, false); timedBuffer.start();