ARTEMIS-1489 Adding Timed Buffer into Critical Analyzer

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2bf690e2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2bf690e2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2bf690e2

Branch: refs/heads/master
Commit: 2bf690e21bdd23aaf7bdd42661be9700c3284688
Parents: 5a476a1
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Mon Oct 30 12:10:59 2017 -0400
Committer: Justin Bertram <jbert...@apache.org>
Committed: Tue Oct 31 08:33:44 2017 -0500

----------------------------------------------------------------------
 .../utils/critical/CriticalAnalyzerImpl.java    |  2 +
 .../utils/critical/CriticalComponentImpl.java   |  4 ++
 .../core/io/AbstractSequentialFileFactory.java  | 12 +++-
 .../core/io/aio/AIOSequentialFileFactory.java   | 12 ++--
 .../artemis/core/io/buffer/TimedBuffer.java     | 58 ++++++++++++--------
 .../io/mapped/MappedSequentialFileFactory.java  |  4 +-
 .../core/io/nio/NIOSequentialFileFactory.java   | 12 ++--
 .../artemis/core/io/JournalTptBenchmark.java    |  4 +-
 .../core/io/SequentialFileTptBenchmark.java     |  4 +-
 .../impl/journal/JournalStorageManager.java     |  4 +-
 .../unit/core/journal/impl/TimedBufferTest.java | 10 ++--
 11 files changed, 78 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
index 1c2c0eb..6a9a0dd 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java
@@ -147,6 +147,8 @@ public class CriticalAnalyzerImpl implements 
CriticalAnalyzer {
             logger.warn(e.getMessage(), e);
          }
       }
+
+      actions.clear();
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
index c1c5602..07d5a3f 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponentImpl.java
@@ -25,6 +25,10 @@ public class CriticalComponentImpl implements 
CriticalComponent {
    private final CriticalMeasure[] measures;
    private final CriticalAnalyzer analyzer;
 
+   public CriticalAnalyzer getCriticalAnalyzer() {
+      return analyzer;
+   }
+
    public CriticalComponentImpl(CriticalAnalyzer analyzer, int numberOfPaths) {
       if (analyzer == null) {
          analyzer = EmptyCriticalAnalyzer.getInstance();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
----------------------------------------------------------------------
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
index 0507373..b9ea6a8 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
@@ -33,6 +33,8 @@ import 
org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.jboss.logging.Logger;
 
 /**
@@ -74,11 +76,17 @@ public abstract class AbstractSequentialFileFactory 
implements SequentialFileFac
                                            final int bufferTimeout,
                                            final int maxIO,
                                            final boolean logRates,
-                                           final IOCriticalErrorListener 
criticalErrorListener) {
+                                           final IOCriticalErrorListener 
criticalErrorListener,
+                                           CriticalAnalyzer criticalAnalyzer) {
       this.journalDir = journalDir;
 
+      if (criticalAnalyzer == null) {
+         criticalAnalyzer = EmptyCriticalAnalyzer.getInstance();
+      }
+
       if (buffered && bufferTimeout > 0) {
-         timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, logRates);
+         timedBuffer = new TimedBuffer(criticalAnalyzer, bufferSize, 
bufferTimeout, logRates);
+         criticalAnalyzer.add(timedBuffer);
       } else {
          timedBuffer = null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
index e8cc97e..d8288e6 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioFile;
 import org.apache.activemq.artemis.jlibaio.SubmitInfo;
 import org.apache.activemq.artemis.jlibaio.util.CallbackCache;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 import org.jboss.logging.Logger;
 
 public final class AIOSequentialFileFactory extends 
AbstractSequentialFileFactory {
@@ -56,11 +57,11 @@ public final class AIOSequentialFileFactory extends 
AbstractSequentialFileFactor
    private static final String AIO_TEST_FILE = ".aio-test";
 
    public AIOSequentialFileFactory(final File journalDir, int maxIO) {
-      this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, null);
+      this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, null, null);
    }
 
    public AIOSequentialFileFactory(final File journalDir, final 
IOCriticalErrorListener listener, int maxIO) {
-      this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, listener);
+      this(journalDir, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, maxIO, false, listener, 
null);
    }
 
    public AIOSequentialFileFactory(final File journalDir,
@@ -68,7 +69,7 @@ public final class AIOSequentialFileFactory extends 
AbstractSequentialFileFactor
                                    final int bufferTimeout,
                                    final int maxIO,
                                    final boolean logRates) {
-      this(journalDir, bufferSize, bufferTimeout, maxIO, logRates, null);
+      this(journalDir, bufferSize, bufferTimeout, maxIO, logRates, null, null);
    }
 
    public AIOSequentialFileFactory(final File journalDir,
@@ -76,8 +77,9 @@ public final class AIOSequentialFileFactory extends 
AbstractSequentialFileFactor
                                    final int bufferTimeout,
                                    final int maxIO,
                                    final boolean logRates,
-                                   final IOCriticalErrorListener listener) {
-      super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, 
listener);
+                                   final IOCriticalErrorListener listener,
+                                   final CriticalAnalyzer analyzer) {
+      super(journalDir, true, bufferSize, bufferTimeout, maxIO, logRates, 
listener, analyzer);
       callbackPool = new CallbackCache<>(maxIO);
       if (logger.isTraceEnabled()) {
          logger.trace("New AIO File Created");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
index f469bf8..e0fe149 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
@@ -32,9 +32,14 @@ import 
org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
 import org.jboss.logging.Logger;
 
-public final class TimedBuffer {
+public final class TimedBuffer extends CriticalComponentImpl {
+
+   protected static final int CRITICAL_PATHS = 1;
+   protected static final int CRITICAL_PATH_FLUSH = 0;
 
    private static final Logger logger = Logger.getLogger(TimedBuffer.class);
 
@@ -99,7 +104,8 @@ public final class TimedBuffer {
 
    // Public --------------------------------------------------------
 
-   public TimedBuffer(final int size, final int timeout, final boolean 
logRates) {
+   public TimedBuffer(CriticalAnalyzer analyzer, final int size, final int 
timeout, final boolean logRates) {
+      super(analyzer, CRITICAL_PATHS);
       bufferSize = size;
 
       this.logRates = logRates;
@@ -286,38 +292,42 @@ public final class TimedBuffer {
             throw new IllegalStateException("TimedBuffer is not started");
          }
 
-         if (!delayFlush && buffer.writerIndex() > 0) {
-            int pos = buffer.writerIndex();
-
-            if (logRates) {
-               bytesFlushed.addAndGet(pos);
-            }
+         enterCritical(CRITICAL_PATH_FLUSH);
+         try {
+            if (!delayFlush && buffer.writerIndex() > 0) {
+               int pos = buffer.writerIndex();
 
-            final ByteBuffer bufferToFlush = 
bufferObserver.newBuffer(bufferSize, pos);
-            //bufferObserver::newBuffer doesn't necessary return a buffer with 
limit == pos or limit == bufferSize!!
-            bufferToFlush.limit(pos);
-            //perform memcpy under the hood due to the off heap buffer
-            buffer.getBytes(0, bufferToFlush);
+               if (logRates) {
+                  bytesFlushed.addAndGet(pos);
+               }
 
+               final ByteBuffer bufferToFlush = 
bufferObserver.newBuffer(bufferSize, pos);
+               //bufferObserver::newBuffer doesn't necessary return a buffer 
with limit == pos or limit == bufferSize!!
+               bufferToFlush.limit(pos);
+               //perform memcpy under the hood due to the off heap buffer
+               buffer.getBytes(0, bufferToFlush);
 
-            bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
+               bufferObserver.flushBuffer(bufferToFlush, pendingSync, 
callbacks);
 
-            stopSpin();
+               stopSpin();
 
-            pendingSync = false;
+               pendingSync = false;
 
-            // swap the instance as the previous callback list is being used 
asynchronously
-            callbacks = new ArrayList<>();
+               // swap the instance as the previous callback list is being 
used asynchronously
+               callbacks = new ArrayList<>();
 
-            buffer.clear();
+               buffer.clear();
 
-            bufferLimit = 0;
+               bufferLimit = 0;
 
-            flushesDone.incrementAndGet();
+               flushesDone.incrementAndGet();
 
-            return pos > 0;
-         } else {
-            return false;
+               return pos > 0;
+            } else {
+               return false;
+            }
+         } finally {
+            leaveCritical(CRITICAL_PATH_FLUSH);
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
----------------------------------------------------------------------
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
index 2be2ff2..2cdaba1 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java
@@ -40,7 +40,9 @@ public final class MappedSequentialFileFactory extends 
AbstractSequentialFileFac
                                        final int bufferTimeout,
                                        IOCriticalErrorListener 
criticalErrorListener) {
 
-      super(directory, buffered, bufferSize, bufferTimeout, 1, false, 
criticalErrorListener);
+      // at the moment we only use the critical analyzer on the timed buffer
+      // MappedSequentialFile is not using any buffering, hence we just pass 
in null
+      super(directory, buffered, bufferSize, bufferTimeout, 1, false, 
criticalErrorListener, null);
 
       this.capacity = capacity;
       this.setDatasync(true);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
index 781176e..b585b24 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
@@ -27,6 +27,7 @@ import 
org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.utils.Env;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 
 public final class NIOSequentialFileFactory extends 
AbstractSequentialFileFactory {
 
@@ -42,7 +43,7 @@ public final class NIOSequentialFileFactory extends 
AbstractSequentialFileFactor
    }
 
    public NIOSequentialFileFactory(final File journalDir, final 
IOCriticalErrorListener listener, final int maxIO) {
-      this(journalDir, false, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener);
+      this(journalDir, false, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener, 
null);
    }
 
    public NIOSequentialFileFactory(final File journalDir, final boolean 
buffered, final int maxIO) {
@@ -53,7 +54,7 @@ public final class NIOSequentialFileFactory extends 
AbstractSequentialFileFactor
                                    final boolean buffered,
                                    final IOCriticalErrorListener listener,
                                    final int maxIO) {
-      this(journalDir, buffered, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener);
+      this(journalDir, buffered, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, maxIO, false, listener, 
null);
    }
 
    public NIOSequentialFileFactory(final File journalDir,
@@ -62,7 +63,7 @@ public final class NIOSequentialFileFactory extends 
AbstractSequentialFileFactor
                                    final int bufferTimeout,
                                    final int maxIO,
                                    final boolean logRates) {
-      this(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, 
null);
+      this(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, 
null, null);
    }
 
    public NIOSequentialFileFactory(final File journalDir,
@@ -71,8 +72,9 @@ public final class NIOSequentialFileFactory extends 
AbstractSequentialFileFactor
                                    final int bufferTimeout,
                                    final int maxIO,
                                    final boolean logRates,
-                                   final IOCriticalErrorListener listener) {
-      super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, 
listener);
+                                   final IOCriticalErrorListener listener,
+                                   final CriticalAnalyzer analyzer) {
+      super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, 
listener, analyzer);
       this.bufferPooling = true;
       this.bytesPool = new ThreadLocal<>();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
----------------------------------------------------------------------
diff --git 
a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
 
b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
index b0096b7..4562585 100644
--- 
a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
+++ 
b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
@@ -66,10 +66,10 @@ public class JournalTptBenchmark {
                .setDatasync(dataSync);
             break;
          case Nio:
-            factory = new NIOSequentialFileFactory(tmpDirectory, true, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, 
null).setDatasync(dataSync);
+            factory = new NIOSequentialFileFactory(tmpDirectory, true, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null, 
null).setDatasync(dataSync);
             break;
          case Aio:
-            factory = new AIOSequentialFileFactory(tmpDirectory, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, 
null).setDatasync(dataSync);
+            factory = new AIOSequentialFileFactory(tmpDirectory, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null, 
null).setDatasync(dataSync);
             //disable it when using directly the same buffer: 
((AIOSequentialFileFactory)factory).disableBufferReuse();
             if (!LibaioContext.isLoaded()) {
                throw new IllegalStateException("lib AIO not loaded!");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
----------------------------------------------------------------------
diff --git 
a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
 
b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
index ed14ae4..8ba0ccc 100644
--- 
a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
+++ 
b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
@@ -59,10 +59,10 @@ public class SequentialFileTptBenchmark {
             factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, 
true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 
null).setDatasync(dataSync);
             break;
          case Nio:
-            factory = new NIOSequentialFileFactory(tmpDirectory, true, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, 
null).setDatasync(dataSync);
+            factory = new NIOSequentialFileFactory(tmpDirectory, true, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null, 
null).setDatasync(dataSync);
             break;
          case Aio:
-            factory = new AIOSequentialFileFactory(tmpDirectory, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, 
null).setDatasync(dataSync);
+            factory = new AIOSequentialFileFactory(tmpDirectory, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 
ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null, 
null).setDatasync(dataSync);
             //disable it when using directly the same buffer: 
((AIOSequentialFileFactory)factory).disableBufferReuse();
             if (!LibaioContext.isLoaded()) {
                throw new IllegalStateException("lib AIO not loaded!");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 84adde4..160d12d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -133,11 +133,11 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
 
          case NIO:
             ActiveMQServerLogger.LOGGER.journalUseNIO();
-            journalFF = new 
NIOSequentialFileFactory(config.getJournalLocation(), true, 
config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), 
config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), 
criticalErrorListener);
+            journalFF = new 
NIOSequentialFileFactory(config.getJournalLocation(), true, 
config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), 
config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), 
criticalErrorListener, getCriticalAnalyzer());
             break;
          case ASYNCIO:
             ActiveMQServerLogger.LOGGER.journalUseAIO();
-            journalFF = new 
AIOSequentialFileFactory(config.getJournalLocation(), 
config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), 
config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), 
criticalErrorListener);
+            journalFF = new 
AIOSequentialFileFactory(config.getJournalLocation(), 
config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), 
config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), 
criticalErrorListener, getCriticalAnalyzer());
             break;
          case MAPPED:
             ActiveMQServerLogger.LOGGER.journalUseMAPPED();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2bf690e2/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
index 2462ee7..3619db2 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
@@ -87,7 +87,7 @@ public class TimedBufferTest extends ActiveMQTestBase {
          }
       }
 
-      TimedBuffer timedBuffer = new TimedBuffer(100, 
TimedBufferTest.ONE_SECOND_IN_NANOS, false);
+      TimedBuffer timedBuffer = new TimedBuffer(null, 100, 
TimedBufferTest.ONE_SECOND_IN_NANOS, false);
 
       timedBuffer.start();
 
@@ -155,7 +155,7 @@ public class TimedBufferTest extends ActiveMQTestBase {
          }
       }
 
-      TimedBuffer timedBuffer = new TimedBuffer(100, 
TimedBufferTest.ONE_SECOND_IN_NANOS / 2, false);
+      TimedBuffer timedBuffer = new TimedBuffer(null, 100, 
TimedBufferTest.ONE_SECOND_IN_NANOS / 2, false);
 
       timedBuffer.start();
 
@@ -393,7 +393,7 @@ public class TimedBufferTest extends ActiveMQTestBase {
       //it is optimistic: the timeout and the blockingDeviceFlushTime are a 
perfect match
       final long deviceTime = timeout;
       final int bufferSize = Env.osPageSize();
-      final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) 
timeout, false);
+      final TimedBuffer timedBuffer = new TimedBuffer(null, bufferSize, (int) 
timeout, false);
       timedBuffer.start();
       try (NonBlockingObserver observer = new NonBlockingObserver(bufferSize, 
deviceTime)) {
          timedBuffer.setObserver(observer);
@@ -434,7 +434,7 @@ public class TimedBufferTest extends ActiveMQTestBase {
       //it is optimistic: the timeout and the blockingDeviceFlushTime are a 
perfect match
       final long deviceTime = timeout;
       final int bufferSize = Env.osPageSize();
-      final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) 
timeout, false);
+      final TimedBuffer timedBuffer = new TimedBuffer(null, bufferSize, (int) 
timeout, false);
       timedBuffer.start();
       try (BlockingObserver observer = new BlockingObserver(bufferSize, 
deviceTime)) {
          timedBuffer.setObserver(observer);
@@ -489,7 +489,7 @@ public class TimedBufferTest extends ActiveMQTestBase {
          }
       }
 
-      TimedBuffer timedBuffer = new TimedBuffer(100, 
TimedBufferTest.ONE_SECOND_IN_NANOS / 10, false);
+      TimedBuffer timedBuffer = new TimedBuffer(null, 100, 
TimedBufferTest.ONE_SECOND_IN_NANOS / 10, false);
 
       timedBuffer.start();
 

Reply via email to