ARTEMIS-1663 - Add new message count and size metrics

Adding new metrics for tracking message counts and sizes on a Queue.
This includes tracking metrics for pending, delivering and scheduled
messages.  The paging store also tracks message size now.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ea70af15
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ea70af15
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ea70af15

Branch: refs/heads/master
Commit: ea70af15a3fbfc1b6ac86589e1e2e04a79ca3e23
Parents: 2eac195
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Mon Feb 5 13:24:31 2018 -0500
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Thu Feb 8 11:35:12 2018 -0500

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  11 +-
 .../api/core/management/QueueControl.java       |  58 ++
 .../artemis/core/message/impl/CoreMessage.java  |   5 +
 .../core/message/impl/MessageInternalImpl.java  |   5 +
 .../artemis/core/journal/impl/JournalImpl.java  |   6 +-
 .../protocol/amqp/broker/AMQPMessage.java       |  13 +-
 .../core/protocol/openwire/OpenwireMessage.java |   9 +-
 .../core/management/impl/QueueControlImpl.java  | 110 +++-
 .../artemis/core/paging/PagedMessage.java       |  10 +
 .../core/paging/cursor/PagePosition.java        |   4 +
 .../core/paging/cursor/PageSubscription.java    |   9 +
 .../paging/cursor/PageSubscriptionCounter.java  |  16 +-
 .../core/paging/cursor/PagedReferenceImpl.java  |  19 +-
 .../paging/cursor/impl/PagePositionImpl.java    |  22 +
 .../impl/PageSubscriptionCounterImpl.java       | 146 ++++-
 .../cursor/impl/PageSubscriptionImpl.java       |  69 +-
 .../core/paging/impl/PagedMessageImpl.java      |   6 +
 .../core/paging/impl/PagingStoreImpl.java       |  11 +-
 .../core/persistence/StorageManager.java        |   6 +-
 .../journal/AbstractJournalStorageManager.java  |  37 +-
 .../impl/journal/DescribeJournal.java           |  61 +-
 .../impl/journal/LargeServerMessageImpl.java    |  10 +-
 .../journal/codec/PageCountPendingImpl.java     |   5 +-
 .../impl/journal/codec/PageCountRecord.java     |  18 +-
 .../impl/journal/codec/PageCountRecordInc.java  |  18 +-
 .../impl/nullpm/NullStorageManager.java         |   6 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |   2 +-
 .../core/server/ActiveMQServerLogger.java       |   4 +
 .../artemis/core/server/MessageReference.java   |  11 +
 .../activemq/artemis/core/server/Queue.java     |  34 +-
 .../core/server/ScheduledDeliveryHandler.java   |   6 +
 .../core/server/cluster/impl/BridgeImpl.java    |   2 +-
 .../core/server/impl/LastValueQueue.java        |  10 +-
 .../core/server/impl/MessageReferenceImpl.java  |   8 +-
 .../server/impl/PostOfficeJournalLoader.java    |  13 +-
 .../artemis/core/server/impl/QueueImpl.java     | 144 +++-
 .../server/impl/QueuePendingMessageMetrics.java | 147 +++++
 .../impl/ScheduledDeliveryHandlerImpl.java      |  29 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  75 ++-
 .../transaction/impl/TransactionImplTest.java   |   6 +-
 .../main/resources/metrics/queueMetrics.groovy  |  37 ++
 .../main/resources/servers/artemisServer.groovy |   6 +
 .../compatibility/JournalCompatibilityTest.java |  43 +-
 .../tests/compatibility/VersionedBaseTest.java  |   7 +-
 .../integration/client/AcknowledgeTest.java     |   5 +
 .../integration/client/SendAckFailTest.java     |  12 +-
 .../management/ManagementTestBase.java          |  15 +
 .../management/QueueControlTest.java            | 452 +++++++------
 .../management/QueueControlUsingCoreTest.java   |  55 +-
 .../integration/paging/PagingCounterTest.java   |  21 +-
 .../AbstractPersistentStatTestSupport.java      | 213 ++++++
 .../metrics/JournalPageCountSizeTest.java       | 144 ++++
 .../metrics/JournalPendingMessageTest.java      | 651 +++++++++++++++++++
 .../unit/core/postoffice/impl/FakeQueue.java    |  47 +-
 .../core/server/impl/fakes/FakeConsumer.java    |   4 +-
 55 files changed, 2502 insertions(+), 391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index d24cd95..031c426 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -657,6 +657,15 @@ public interface Message {
 
    int getMemoryEstimate();
 
-
+   /**
+    * This is the size of the message when persisted on disk which is used for 
metrics tracking
+    * Note that even if the message itself is not persisted on disk (ie 
non-durable) this value is
+    * still used for metrics tracking
+    * If a normal message it will be the encoded message size
+    * If a large message it will be encoded message size + large message body 
size
+    * @return
+    * @throws ActiveMQException
+    */
+   long getPersistentSize() throws ActiveMQException;
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 447417f..2578684 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -81,12 +81,52 @@ public interface QueueControl {
    long getMessageCount();
 
    /**
+    * Returns the persistent size of all messages currently in this queue. The 
persistent size of a message
+    * is the amount of space the message would take up on disk which is used 
to track how much data there
+    * is to consume on this queue
+    */
+   @Attribute(desc = "persistent size of all messages (including durable and 
non-durable) currently in this queue (includes scheduled, paged, and 
in-delivery messages)")
+   long getPersistentSize();
+
+   /**
+    * Returns the number of durable messages currently in this queue.
+    */
+   @Attribute(desc = "number of durable messages currently in this queue 
(includes scheduled, paged, and in-delivery messages)")
+   long getDurableMessageCount();
+
+   /**
+    * Returns the persistent size of durable messages currently in this queue. 
The persistent size of a message
+    * is the amount of space the message would take up on disk which is used 
to track how much data there
+    * is to consume on this queue
+    */
+   @Attribute(desc = "persistent size of durable messages currently in this 
queue (includes scheduled, paged, and in-delivery messages)")
+   long getDurablePersistentSize();
+
+   /**
     * Returns the number of scheduled messages in this queue.
     */
    @Attribute(desc = "number of scheduled messages in this queue")
    long getScheduledCount();
 
    /**
+    * Returns the size of scheduled messages in this queue.
+    */
+   @Attribute(desc = "persistent size of scheduled messages in this queue")
+   long getScheduledSize();
+
+   /**
+    * Returns the number of durable scheduled messages in this queue.
+    */
+   @Attribute(desc = "number of durable scheduled messages in this queue")
+   long getDurableScheduledCount();
+
+   /**
+    * Returns the size of durable scheduled messages in this queue.
+    */
+   @Attribute(desc = "persistent size of durable scheduled messages in this 
queue")
+   long getDurableScheduledSize();
+
+   /**
     * Returns the number of consumers consuming messages from this queue.
     */
    @Attribute(desc = "number of consumers consuming messages from this queue")
@@ -99,6 +139,24 @@ public interface QueueControl {
    int getDeliveringCount();
 
    /**
+    * Returns the persistent size of messages that this queue is currently 
delivering to its consumers.
+    */
+   @Attribute(desc = "persistent size of messages that this queue is currently 
delivering to its consumers")
+   long getDeliveringSize();
+
+   /**
+    * Returns the number of durable messages that this queue is currently 
delivering to its consumers.
+    */
+   @Attribute(desc = "number of durable messages that this queue is currently 
delivering to its consumers")
+   int getDurableDeliveringCount();
+
+   /**
+    * Returns the size of durable messages that this queue is currently 
delivering to its consumers.
+    */
+   @Attribute(desc = "persistent size of durable messages that this queue is 
currently delivering to its consumers")
+   long getDurableDeliveringSize();
+
+   /**
     * Returns the number of messages added to this queue since it was created.
     */
    @Attribute(desc = "number of messages added to this queue since it was 
created")

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 0fb7c3e..172cc18 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -1150,4 +1150,9 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
    private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
       return coreMessageObjectPools == null ? null : 
coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
    }
+
+   @Override
+   public long getPersistentSize() throws ActiveMQException {
+      return getEncodeSize();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
index 56ff816..17cb828 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java
@@ -695,4 +695,9 @@ public class MessageInternalImpl implements MessageInternal 
{
       return new TypedProperties(message.getTypedProperties());
    }
 
+   @Override
+   public long getPersistentSize() throws ActiveMQException {
+      return message.getPersistentSize();
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 88204d4..34ee72e 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -191,7 +191,7 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
 
    private Executor appendExecutor = null;
 
-   private ConcurrentHashSet<CountDownLatch> latches = new 
ConcurrentHashSet<>();
+   private final ConcurrentHashSet<CountDownLatch> latches = new 
ConcurrentHashSet<>();
 
    private final ExecutorFactory providedIOThreadPool;
    protected ExecutorFactory ioExecutorFactory;
@@ -2413,7 +2413,7 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
                                                  final List<JournalFile> 
newFiles,
                                                  final List<Pair<String, 
String>> renames) throws Exception {
 
-      return JournalCompactor.writeControlFile(fileFactory, files, newFiles, 
renames);
+      return AbstractJournalUpdateTask.writeControlFile(fileFactory, files, 
newFiles, renames);
    }
 
 
@@ -2763,7 +2763,7 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
       ArrayList<String> newFiles = new ArrayList<>();
       ArrayList<Pair<String, String>> renames = new ArrayList<>();
 
-      SequentialFile controlFile = 
JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles, renames);
+      SequentialFile controlFile = 
AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, 
renames);
       if (controlFile != null) {
          for (String dataFile : dataFiles) {
             SequentialFile file = fileFactory.createSequentialFile(dataFile);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index cdab412..2d72cf9 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -24,10 +24,8 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import 
org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
@@ -60,6 +58,10 @@ import org.apache.qpid.proton.codec.WritableBuffer;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
 // see 
https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
 public class AMQPMessage extends RefCountMessage {
 
@@ -1179,4 +1181,9 @@ public class AMQPMessage extends RefCountMessage {
    private SimpleString.StringSimpleStringPool getPropertyValuesPool() {
       return coreMessageObjectPools == null ? null : 
coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool();
    }
+
+   @Override
+   public long getPersistentSize() throws ActiveMQException {
+      return getEncodeSize();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index c63fe19..45e8953 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.protocol.openwire;
 
 import java.util.Set;
 
-import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import 
org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
@@ -29,6 +29,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 
+import io.netty.buffer.ByteBuf;
+
 // TODO: Implement this
 public class OpenwireMessage implements Message {
 
@@ -496,4 +498,9 @@ public class OpenwireMessage implements Message {
    public int getMemoryEstimate() {
       return 0;
    }
+
+   @Override
+   public long getPersistentSize() throws ActiveMQException {
+      return 0;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index cefcbf9..e678ab8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -227,6 +227,42 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
    }
 
    @Override
+   public long getPersistentSize() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getPersistentSize();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public long getDurableMessageCount() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDurableMessageCount();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public long getDurablePersistentSize() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDurablePersistentSize();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public int getConsumerCount() {
       checkStarted();
 
@@ -251,6 +287,42 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
    }
 
    @Override
+   public long getDeliveringSize() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDeliveringSize();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public int getDurableDeliveringCount() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDurableDeliveringCount();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public long getDurableDeliveringSize() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDurableDeliveringSize();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public long getMessagesAdded() {
       checkStarted();
 
@@ -323,6 +395,42 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
    }
 
    @Override
+   public long getScheduledSize() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getScheduledSize();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public long getDurableScheduledCount() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDurableScheduledCount();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public long getDurableScheduledSize() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getDurableScheduledSize();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public String getDeadLetterAddress() {
       checkStarted();
 
@@ -998,7 +1106,7 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl {
       try {
          long index = 0;
          long start = (page - 1) * pageSize;
-         long end = Math.min((long)(page * pageSize), queue.getMessageCount());
+         long end = Math.min(page * pageSize, queue.getMessageCount());
 
          ArrayList<CompositeData> c = new ArrayList<>();
          Filter thefilter = FilterImpl.createFilter(filter);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
index 0124f09..5b39691 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.paging;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -38,4 +39,13 @@ public interface PagedMessage extends EncodingSupport {
    void initMessage(StorageManager storageManager);
 
    long getTransactionID();
+
+   /**
+    * This is the size of the message when persisted on disk and is used for 
metrics tracking
+    * If a normal message it will be the encoded message size
+    * If a large message it will be encoded message size + large message body 
size
+    * @return
+    * @throws ActiveMQException
+    */
+   long getPersistentSize() throws ActiveMQException;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
index 00955b7..a9e0537 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java
@@ -28,6 +28,10 @@ public interface PagePosition extends 
Comparable<PagePosition> {
 
    int getMessageNr();
 
+   long getPersistentSize();
+
+   void setPersistentSize(long persistentSize);
+
    PagePosition nextMessage();
 
    PagePosition nextPage();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index 985f563..b11362d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -44,6 +44,8 @@ public interface PageSubscription {
 
    long getMessageCount();
 
+   long getPersistentSize();
+
    long getId();
 
    boolean isPersistent();
@@ -161,4 +163,11 @@ public interface PageSubscription {
     * @throws Exception
     */
    void onDeletePage(Page deletedPage) throws Exception;
+
+   long getDeliveredCount();
+
+   long getDeliveredSize();
+
+   void incrementDeliveredSize(long size);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
index 37cdb3b..33b744f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java
@@ -26,13 +26,17 @@ public interface PageSubscriptionCounter {
 
    long getValue();
 
-   void increment(Transaction tx, int add) throws Exception;
+   long getPersistentSizeAdded();
 
-   void loadValue(long recordValueID, long value);
+   long getPersistentSize();
 
-   void loadInc(long recordInd, int add);
+   void increment(Transaction tx, int add, long persistentSize) throws 
Exception;
 
-   void applyIncrementOnTX(Transaction tx, long recordID, int add);
+   void loadValue(long recordValueID, long value, long persistentSize);
+
+   void loadInc(long recordInd, int add, long persistentSize);
+
+   void applyIncrementOnTX(Transaction tx, long recordID, int add, long 
persistentSize);
 
    /**
     * This will process the reload
@@ -43,12 +47,12 @@ public interface PageSubscriptionCounter {
     * @param id
     * @param variance
     */
-   void addInc(long id, int variance);
+   void addInc(long id, int variance, long size);
 
    // used when deleting the counter
    void delete() throws Exception;
 
-   void pendingCounter(Page page, int increment) throws Exception;
+   void pendingCounter(Page page, int increment, long persistentSize) throws 
Exception;
 
    // used when leaving page mode, so the counters are deleted in batches
    // for each queue on the address

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 42c5423..f5d49cf 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -59,6 +59,8 @@ public class PagedReferenceImpl extends 
LinkedListImpl.Node<PagedReferenceImpl>
 
    private final long messageID;
 
+   private long messageSize = -1;
+
    @Override
    public Object getProtocolData() {
       return protocolData;
@@ -104,6 +106,9 @@ public class PagedReferenceImpl extends 
LinkedListImpl.Node<PagedReferenceImpl>
       this.largeMessage = message.getMessage().isLargeMessage();
       this.transactionID = message.getTransactionID();
       this.messageID = message.getMessage().getMessageID();
+
+      //pre-cache the message size so we don't have to reload the message 
later if it is GC'd
+      getPersistentSize();
    }
 
    @Override
@@ -191,7 +196,7 @@ public class PagedReferenceImpl extends 
LinkedListImpl.Node<PagedReferenceImpl>
 
    @Override
    public void handled() {
-      getQueue().referenceHandled();
+      getQueue().referenceHandled(this);
    }
 
    @Override
@@ -280,4 +285,16 @@ public class PagedReferenceImpl extends 
LinkedListImpl.Node<PagedReferenceImpl>
       return messageID;
    }
 
+   @Override
+   public long getPersistentSize() {
+      if (messageSize == -1) {
+         try {
+            messageSize = getPagedMessage().getPersistentSize();
+         } catch (Throwable e) {
+            ActiveMQServerLogger.LOGGER.errorCalculatePersistentSize(e);
+         }
+      }
+      return messageSize;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
index 076f872..52d1c83 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java
@@ -37,6 +37,12 @@ public class PagePositionImpl implements PagePosition {
    private long recordID = -1;
 
    /**
+    * Optional size value that can be set to specify the peristent size of the 
message
+    * for metrics tracking purposes
+    */
+   private long persistentSize;
+
+   /**
     * @param pageNr
     * @param messageNr
     */
@@ -82,6 +88,22 @@ public class PagePositionImpl implements PagePosition {
       return messageNr;
    }
 
+   /**
+    * @return the persistentSize
+    */
+   @Override
+   public long getPersistentSize() {
+      return persistentSize;
+   }
+
+   /**
+    * @param persistentSize the persistentSize to set
+    */
+   @Override
+   public void setPersistentSize(long persistentSize) {
+      this.persistentSize = persistentSize;
+   }
+
    @Override
    public int compareTo(PagePosition o) {
       if (pageNr > o.getPageNr()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index 01ad778..3bb56f8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -21,10 +21,10 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
-import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
 import org.apache.activemq.artemis.core.paging.impl.Page;
@@ -60,10 +60,13 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
    private final Executor executor;
 
    private final AtomicLong value = new AtomicLong(0);
+   private final AtomicLong persistentSize = new AtomicLong(0);
 
    private final AtomicLong added = new AtomicLong(0);
+   private final AtomicLong addedPersistentSize = new AtomicLong(0);
 
    private final AtomicLong pendingValue = new AtomicLong(0);
+   private final AtomicLong pendingPersistentSize = new AtomicLong(0);
 
    private final LinkedList<Long> incrementRecords = new LinkedList<>();
 
@@ -71,9 +74,9 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
    // we will recount a page case we still see pending records
    // as soon as we close a page we remove these records replacing by a 
regular page increment record
    // A Map per pageID, each page will have a set of IDs, with the increment 
on each one
-   private final Map<Long, Pair<Long, AtomicInteger>> pendingCounters = new 
HashMap<>();
+   private final Map<Long, PendingCounter> pendingCounters = new HashMap<>();
 
-   private LinkedList<Pair<Long, Integer>> loadList;
+   private LinkedList<PendingCounter> loadList;
 
    private final Runnable cleanupCheck = new Runnable() {
       @Override
@@ -104,6 +107,16 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
       return value.get() + pendingValue.get();
    }
 
+   @Override
+   public long getPersistentSizeAdded() {
+      return addedPersistentSize.get() + pendingPersistentSize.get();
+   }
+
+   @Override
+   public long getPersistentSize() {
+      return persistentSize.get() + pendingPersistentSize.get();
+   }
+
    /**
     * This is used only on non transactional paging
     *
@@ -112,24 +125,25 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
     * @throws Exception
     */
    @Override
-   public synchronized void pendingCounter(Page page, int increment) throws 
Exception {
+   public synchronized void pendingCounter(Page page, int increment, long 
size) throws Exception {
       if (!persistent) {
          return; // nothing to be done
       }
 
-      Pair<Long, AtomicInteger> pendingInfo = pendingCounters.get((long) 
page.getPageId());
+      PendingCounter pendingInfo = pendingCounters.get((long) 
page.getPageId());
       if (pendingInfo == null) {
          // We have to make sure this is sync here
          // not syncing this to disk may cause the page files to be out of 
sync on pages.
          // we can't afford the case where a page file is written without a 
record here
          long id = storage.storePendingCounter(this.subscriptionID, 
page.getPageId(), increment);
-         pendingInfo = new Pair<>(id, new AtomicInteger(1));
+         pendingInfo = new PendingCounter(id, increment, size);
          pendingCounters.put((long) page.getPageId(), pendingInfo);
       } else {
-         pendingInfo.getB().addAndGet(increment);
+         pendingInfo.addAndGet(increment, size);
       }
 
       pendingValue.addAndGet(increment);
+      pendingPersistentSize.addAndGet(size);
 
       page.addPendingCounter(this);
    }
@@ -141,23 +155,25 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
     */
    @Override
    public void cleanupNonTXCounters(final long pageID) throws Exception {
-      Pair<Long, AtomicInteger> pendingInfo;
+      PendingCounter pendingInfo;
       synchronized (this) {
          pendingInfo = pendingCounters.remove(pageID);
       }
 
       if (pendingInfo != null) {
-         final AtomicInteger valueCleaned = pendingInfo.getB();
+         final int valueCleaned = pendingInfo.getCount();
+         final long valueSizeCleaned = pendingInfo.getPersistentSize();
          Transaction tx = new TransactionImpl(storage);
-         storage.deletePendingPageCounter(tx.getID(), pendingInfo.getA());
+         storage.deletePendingPageCounter(tx.getID(), pendingInfo.getId());
 
          // To apply the increment of the value just being cleaned
-         increment(tx, valueCleaned.get());
+         increment(tx, valueCleaned, valueSizeCleaned);
 
          tx.addOperation(new TransactionOperationAbstract() {
             @Override
             public void afterCommit(Transaction tx) {
-               pendingValue.addAndGet(-valueCleaned.get());
+               pendingValue.addAndGet(-valueCleaned);
+               pendingPersistentSize.updateAndGet(val -> val >= 
valueSizeCleaned ? val - valueSizeCleaned : 0);
             }
          });
 
@@ -166,21 +182,21 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
    }
 
    @Override
-   public void increment(Transaction tx, int add) throws Exception {
+   public void increment(Transaction tx, int add, long size) throws Exception {
       if (tx == null) {
          if (persistent) {
-            long id = storage.storePageCounterInc(this.subscriptionID, add);
-            incrementProcessed(id, add);
+            long id = storage.storePageCounterInc(this.subscriptionID, add, 
size);
+            incrementProcessed(id, add, size);
          } else {
-            incrementProcessed(-1, add);
+            incrementProcessed(-1, add, size);
          }
       } else {
          if (persistent) {
             tx.setContainsPersistent();
-            long id = storage.storePageCounterInc(tx.getID(), 
this.subscriptionID, add);
-            applyIncrementOnTX(tx, id, add);
+            long id = storage.storePageCounterInc(tx.getID(), 
this.subscriptionID, add, size);
+            applyIncrementOnTX(tx, id, add, size);
          } else {
-            applyIncrementOnTX(tx, -1, add);
+            applyIncrementOnTX(tx, -1, add, size);
          }
       }
    }
@@ -193,7 +209,7 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
     * @param add
     */
    @Override
-   public void applyIncrementOnTX(Transaction tx, long recordID1, int add) {
+   public void applyIncrementOnTX(Transaction tx, long recordID1, int add, 
long size) {
       CounterOperations oper = (CounterOperations) 
tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
 
       if (oper == null) {
@@ -202,22 +218,24 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
          tx.addOperation(oper);
       }
 
-      oper.operations.add(new ItemOper(this, recordID1, add));
+      oper.operations.add(new ItemOper(this, recordID1, add, size));
    }
 
    @Override
-   public synchronized void loadValue(final long recordID1, final long value1) 
{
+   public synchronized void loadValue(final long recordID1, final long value1, 
long size) {
       if (this.subscription != null) {
          // it could be null on testcases... which is ok
          this.subscription.notEmpty();
       }
       this.value.set(value1);
       this.added.set(value1);
+      this.persistentSize.set(size);
+      this.addedPersistentSize.set(size);
       this.recordID = recordID1;
    }
 
-   public synchronized void incrementProcessed(long id, int add) {
-      addInc(id, add);
+   public synchronized void incrementProcessed(long id, int add, long size) {
+      addInc(id, add, size);
       if (incrementRecords.size() > FLUSH_COUNTER) {
          executor.execute(cleanupCheck);
       }
@@ -259,12 +277,12 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
    }
 
    @Override
-   public void loadInc(long id, int add) {
+   public void loadInc(long id, int add, long size) {
       if (loadList == null) {
          loadList = new LinkedList<>();
       }
 
-      loadList.add(new Pair<>(id, add));
+      loadList.add(new PendingCounter(id, add, size));
    }
 
    @Override
@@ -275,10 +293,12 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
             subscription.notEmpty();
          }
 
-         for (Pair<Long, Integer> incElement : loadList) {
-            value.addAndGet(incElement.getB());
-            added.addAndGet(incElement.getB());
-            incrementRecords.add(incElement.getA());
+         for (PendingCounter incElement : loadList) {
+            value.addAndGet(incElement.getCount());
+            added.addAndGet(incElement.getCount());
+            persistentSize.addAndGet(incElement.getPersistentSize());
+            addedPersistentSize.addAndGet(incElement.getPersistentSize());
+            incrementRecords.add(incElement.getId());
          }
          loadList.clear();
          loadList = null;
@@ -286,11 +306,15 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
    }
 
    @Override
-   public synchronized void addInc(long id, int variance) {
+   public synchronized void addInc(long id, int variance, long size) {
       value.addAndGet(variance);
+      this.persistentSize.addAndGet(size);
       if (variance > 0) {
          added.addAndGet(variance);
       }
+      if (size > 0) {
+         addedPersistentSize.addAndGet(size);
+      }
       if (id >= 0) {
          incrementRecords.add(id);
       }
@@ -310,11 +334,13 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
       ArrayList<Long> deleteList;
 
       long valueReplace;
+      long sizeReplace;
       synchronized (this) {
          if (incrementRecords.size() <= FLUSH_COUNTER) {
             return;
          }
          valueReplace = value.get();
+         sizeReplace = persistentSize.get();
          deleteList = new ArrayList<>(incrementRecords);
          incrementRecords.clear();
       }
@@ -332,7 +358,7 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
             storage.deletePageCounter(txCleanup, recordID);
          }
 
-         newRecordID = storage.storePageCounter(txCleanup, subscriptionID, 
valueReplace);
+         newRecordID = storage.storePageCounter(txCleanup, subscriptionID, 
valueReplace, sizeReplace);
 
          if (logger.isTraceEnabled()) {
             logger.trace("Replacing page-counter record = " + recordID + " by 
record = " + newRecordID + " on subscriptionID = " + this.subscriptionID + " 
for queue = " + this.subscription.getQueue().getName());
@@ -354,10 +380,11 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
 
    private static class ItemOper {
 
-      private ItemOper(PageSubscriptionCounterImpl counter, long id, int add) {
+      private ItemOper(PageSubscriptionCounterImpl counter, long id, int add, 
long persistentSize) {
          this.counter = counter;
          this.id = id;
          this.amount = add;
+         this.persistentSize = persistentSize;
       }
 
       PageSubscriptionCounterImpl counter;
@@ -365,6 +392,8 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
       long id;
 
       int amount;
+
+      long persistentSize;
    }
 
    private static class CounterOperations extends TransactionOperationAbstract 
implements TransactionOperation {
@@ -374,8 +403,55 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter {
       @Override
       public void afterCommit(Transaction tx) {
          for (ItemOper oper : operations) {
-            oper.counter.incrementProcessed(oper.id, oper.amount);
+            oper.counter.incrementProcessed(oper.id, oper.amount, 
oper.persistentSize);
          }
       }
    }
+
+   private static class PendingCounter {
+      private static final AtomicIntegerFieldUpdater<PendingCounter> 
COUNT_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(PendingCounter.class, 
"count");
+
+      private static final AtomicLongFieldUpdater<PendingCounter> SIZE_UPDATER 
=
+            AtomicLongFieldUpdater.newUpdater(PendingCounter.class, 
"persistentSize");
+
+      private final long id;
+      private volatile int count;
+      private volatile long persistentSize;
+
+      /**
+       * @param id
+       * @param count
+       * @param size
+       */
+      PendingCounter(long id, int count, long persistentSize) {
+         super();
+         this.id = id;
+         this.count = count;
+         this.persistentSize = persistentSize;
+      }
+      /**
+       * @return the id
+       */
+      public long getId() {
+         return id;
+      }
+      /**
+       * @return the count
+       */
+      public int getCount() {
+         return count;
+      }
+      /**
+       * @return the size
+       */
+      public long getPersistentSize() {
+         return persistentSize;
+      }
+
+      public void addAndGet(int count, long persistentSize) {
+         COUNT_UPDATER.addAndGet(this, count);
+         SIZE_UPDATER.addAndGet(this, persistentSize);
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 24c69be..924aace 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -96,6 +96,8 @@ final class PageSubscriptionImpl implements PageSubscription {
 
    private final AtomicLong deliveredCount = new AtomicLong(0);
 
+   private final AtomicLong deliveredSize = new AtomicLong(0);
+
    PageSubscriptionImpl(final PageCursorProvider cursorProvider,
                         final PagingStore pageStore,
                         final StorageManager store,
@@ -178,6 +180,18 @@ final class PageSubscriptionImpl implements 
PageSubscription {
    }
 
    @Override
+   public long getPersistentSize() {
+      if (empty) {
+         return 0;
+      } else {
+         //A negative value could happen if an old journal was loaded that 
didn't have
+         //size metrics for old records
+         long messageSize = counter.getPersistentSize() - deliveredSize.get();
+         return messageSize > 0 ? messageSize : 0;
+      }
+   }
+
+   @Override
    public PageSubscriptionCounter getCounter() {
       return counter;
    }
@@ -439,7 +453,7 @@ final class PageSubscriptionImpl implements 
PageSubscription {
    public void ackTx(final Transaction tx, final PagedReference reference) 
throws Exception {
       confirmPosition(tx, reference.getPosition());
 
-      counter.increment(tx, -1);
+      counter.increment(tx, -1, -getPersistentSize(reference));
 
       PageTransactionInfo txInfo = getPageTransaction(reference);
       if (txInfo != null) {
@@ -831,6 +845,12 @@ final class PageSubscriptionImpl implements 
PageSubscription {
       }
 
       PageCursorInfo info = getPageInfo(position);
+      PageCache cache = info.getCache();
+      long size = 0;
+      if (cache != null) {
+         size = getPersistentSize(cache.getMessage(position.getMessageNr()));
+         position.setPersistentSize(size);
+      }
 
       logger.tracef("InstallTXCallback looking up pagePosition %s, result=%s", 
position, info);
 
@@ -1060,6 +1080,13 @@ final class PageSubscriptionImpl implements 
PageSubscription {
          }
       }
 
+      /**
+       * @return the cache
+       */
+      public PageCache getCache() {
+         return cache != null ? cache.get() : null;
+      }
+
    }
 
    private final class PageCursorTX extends TransactionOperationAbstract {
@@ -1087,6 +1114,7 @@ final class PageSubscriptionImpl implements 
PageSubscription {
             for (PagePosition confirmed : positions) {
                cursor.processACK(confirmed);
                cursor.deliveredCount.decrementAndGet();
+               cursor.deliveredSize.addAndGet(-confirmed.getPersistentSize());
             }
 
          }
@@ -1309,4 +1337,43 @@ final class PageSubscriptionImpl implements 
PageSubscription {
       public void close() {
       }
    }
+
+   /**
+    * @return the deliveredCount
+    */
+   @Override
+   public long getDeliveredCount() {
+      return deliveredCount.get();
+   }
+
+   /**
+    * @return the deliveredSize
+    */
+   @Override
+   public long getDeliveredSize() {
+      return deliveredSize.get();
+   }
+
+   @Override
+   public void incrementDeliveredSize(long size) {
+      deliveredSize.addAndGet(size);
+   }
+
+   private long getPersistentSize(PagedMessage msg) {
+      try {
+         return msg != null && msg.getPersistentSize() > 0 ? 
msg.getPersistentSize() : 0;
+      } catch (ActiveMQException e) {
+         logger.warn("Error computing persistent size of message: " + msg, e);
+         return 0;
+      }
+   }
+
+   private long getPersistentSize(PagedReference ref) {
+      try {
+         return ref != null && ref.getPersistentSize() > 0 ? 
ref.getPersistentSize() : 0;
+      } catch (ActiveMQException e) {
+         logger.warn("Error computing persistent size of message: " + ref, e);
+         return 0;
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
index d7bd05c..3ef833d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
@@ -169,4 +170,9 @@ public class PagedMessageImpl implements PagedMessage {
          message +
          "]";
    }
+
+   @Override
+   public long getPersistentSize() throws ActiveMQException {
+      return message.getPersistentSize();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index f1beb31..0eec5a0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -840,7 +840,8 @@ public class PagingStoreImpl implements PagingStore {
             // the apply counter will make sure we write a record on journal
             // especially on the case for non transactional sends and paging
             // doing this will give us a possibility of recovering the page 
counters
-            applyPageCounters(tx, getCurrentPage(), listCtx);
+            long persistentSize = pagedMessage.getPersistentSize() > 0 ? 
pagedMessage.getPersistentSize() : 0;
+            applyPageCounters(tx, getCurrentPage(), listCtx, persistentSize);
 
             currentPage.write(pagedMessage);
 
@@ -906,22 +907,22 @@ public class PagingStoreImpl implements PagingStore {
     * @param ctx
     * @throws Exception
     */
-   private void applyPageCounters(Transaction tx, Page page, RouteContextList 
ctx) throws Exception {
+   private void applyPageCounters(Transaction tx, Page page, RouteContextList 
ctx, long size) throws Exception {
       List<org.apache.activemq.artemis.core.server.Queue> durableQueues = 
ctx.getDurableQueues();
       List<org.apache.activemq.artemis.core.server.Queue> nonDurableQueues = 
ctx.getNonDurableQueues();
       for (org.apache.activemq.artemis.core.server.Queue q : durableQueues) {
          if (tx == null) {
             // non transactional writes need an intermediate place
             // to avoid the counter getting out of sync
-            q.getPageSubscription().getCounter().pendingCounter(page, 1);
+            q.getPageSubscription().getCounter().pendingCounter(page, 1, size);
          } else {
             // null tx is treated through pending counters
-            q.getPageSubscription().getCounter().increment(tx, 1);
+            q.getPageSubscription().getCounter().increment(tx, 1, size);
          }
       }
 
       for (org.apache.activemq.artemis.core.server.Queue q : nonDurableQueues) 
{
-         q.getPageSubscription().getCounter().increment(tx, 1);
+         q.getPageSubscription().getCounter().increment(tx, 1, size);
       }
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 6defb1e..f9793d8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -336,7 +336,7 @@ public interface StorageManager extends IDGenerator, 
ActiveMQComponent {
    /**
     * @return The ID with the stored counter
     */
-   long storePageCounter(long txID, long queueID, long value) throws Exception;
+   long storePageCounter(long txID, long queueID, long value, long 
persistentSize) throws Exception;
 
    long storePendingCounter(long queueID, long pageID, int inc) throws 
Exception;
 
@@ -350,13 +350,13 @@ public interface StorageManager extends IDGenerator, 
ActiveMQComponent {
     * @return the ID with the increment record
     * @throws Exception
     */
-   long storePageCounterInc(long txID, long queueID, int add) throws Exception;
+   long storePageCounterInc(long txID, long queueID, int add, long 
persistentSize) throws Exception;
 
    /**
     * @return the ID with the increment record
     * @throws Exception
     */
-   long storePageCounterInc(long queueID, int add) throws Exception;
+   long storePageCounterInc(long queueID, int add, long size) throws Exception;
 
    /**
     * @return the bindings journal

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 34d249e..ada5b90 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -16,7 +16,13 @@
  */
 package org.apache.activemq.artemis.core.persistence.impl.journal;
 
-import javax.transaction.xa.Xid;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.security.DigestInputStream;
@@ -37,6 +43,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import javax.transaction.xa.Xid;
+
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Message;
@@ -109,13 +117,6 @@ import 
org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
 import org.jboss.logging.Logger;
 
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
-
 /**
  * Controls access to the journals and other storage files such as the ones 
used to store pages and
  * large messages.  This class must control writing of any non-transient data, 
as it is the key point
@@ -1084,7 +1085,7 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
                   PageSubscription sub = 
locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, 
pagingManager);
 
                   if (sub != null) {
-                     sub.getCounter().loadValue(record.id, 
encoding.getValue());
+                     sub.getCounter().loadValue(record.id, 
encoding.getValue(), encoding.getPersistentSize());
                   } else {
                      
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
                      messageJournal.appendDeleteRecord(record.id, false);
@@ -1101,7 +1102,7 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
                   PageSubscription sub = 
locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, 
pagingManager);
 
                   if (sub != null) {
-                     sub.getCounter().loadInc(record.id, encoding.getValue());
+                     sub.getCounter().loadInc(record.id, encoding.getValue(), 
encoding.getPersistentSize());
                   } else {
                      
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID());
                      messageJournal.appendDeleteRecord(record.id, false);
@@ -1136,6 +1137,7 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
                case JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER: {
 
                   PageCountPendingImpl pendingCountEncoding = new 
PageCountPendingImpl();
+
                   pendingCountEncoding.decode(buff);
                   pendingCountEncoding.setID(record.id);
 
@@ -1143,6 +1145,7 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
                   if (pendingNonTXPageCounter != null) {
                      pendingNonTXPageCounter.add(pendingCountEncoding);
                   }
+
                   break;
                }
 
@@ -1349,11 +1352,11 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
    }
 
    @Override
-   public long storePageCounterInc(long txID, long queueID, int value) throws 
Exception {
+   public long storePageCounterInc(long txID, long queueID, int value, long 
persistentSize) throws Exception {
       readLock();
       try {
          long recordID = idGenerator.generateID();
-         messageJournal.appendAddRecordTransactional(txID, recordID, 
JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, 
value));
+         messageJournal.appendAddRecordTransactional(txID, recordID, 
JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, 
value, persistentSize));
          return recordID;
       } finally {
          readUnLock();
@@ -1361,11 +1364,11 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
    }
 
    @Override
-   public long storePageCounterInc(long queueID, int value) throws Exception {
+   public long storePageCounterInc(long queueID, int value, long 
persistentSize) throws Exception {
       readLock();
       try {
          final long recordID = idGenerator.generateID();
-         messageJournal.appendAddRecord(recordID, 
JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, 
value), true, getContext());
+         messageJournal.appendAddRecord(recordID, 
JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, 
value, persistentSize), true, getContext());
          return recordID;
       } finally {
          readUnLock();
@@ -1373,11 +1376,11 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
    }
 
    @Override
-   public long storePageCounter(long txID, long queueID, long value) throws 
Exception {
+   public long storePageCounter(long txID, long queueID, long value, long 
persistentSize) throws Exception {
       readLock();
       try {
          final long recordID = idGenerator.generateID();
-         messageJournal.appendAddRecordTransactional(txID, recordID, 
JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, 
value));
+         messageJournal.appendAddRecordTransactional(txID, recordID, 
JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value, 
persistentSize));
          return recordID;
       } finally {
          readUnLock();
@@ -1789,7 +1792,7 @@ public abstract class AbstractJournalStorageManager 
extends CriticalComponentImp
                   PageSubscription sub = 
locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, 
pagingManager);
 
                   if (sub != null) {
-                     sub.getCounter().applyIncrementOnTX(tx, record.id, 
encoding.getValue());
+                     sub.getCounter().applyIncrementOnTX(tx, record.id, 
encoding.getValue(), encoding.getPersistentSize());
                      sub.notEmpty();
                   } else {
                      
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
index bfabc25..6cd417b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
@@ -16,7 +16,29 @@
  */
 package org.apache.activemq.artemis.core.persistence.impl.journal;
 
-import javax.transaction.xa.Xid;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_REF;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_BINDING_RECORD;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_SETTING_RECORD;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE_PROTOCOL;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_REF;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.HEURISTIC_COMPLETION;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ID_COUNTER_RECORD;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COMPLETE;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_TRANSACTION;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_BINDING_RECORD;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_STATUS_RECORD;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SECURITY_RECORD;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
+import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT;
+
 import java.io.File;
 import java.io.PrintStream;
 import java.util.HashMap;
@@ -24,6 +46,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import javax.transaction.xa.Xid;
+
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Message;
@@ -58,29 +82,6 @@ import 
org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.XidCodecSupport;
 
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_REF;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_SETTING_RECORD;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE_PROTOCOL;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_REF;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.HEURISTIC_COMPLETION;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ID_COUNTER_RECORD;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COMPLETE;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_TRANSACTION;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_BINDING_RECORD;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_STATUS_RECORD;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SECURITY_RECORD;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT;
-import static 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_BINDING_RECORD;
-
 
 /**
  * Outputs a String description of the Journals contents.
@@ -217,9 +218,9 @@ public final class DescribeJournal {
                      out.println("####### Counter replace wrongly on queue " + 
queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + 
encoding.getValue());
                   }
 
-                  subsCounter.loadValue(info.id, encoding.getValue());
+                  subsCounter.loadValue(info.id, encoding.getValue(), 
encoding.getPersistentSize());
                   subsCounter.processReload();
-                  out.print("#Counter queue " + queueIDForCounter + " value=" 
+ subsCounter.getValue() + ", result=" + subsCounter.getValue());
+                  out.print("#Counter queue " + queueIDForCounter + " value=" 
+ subsCounter.getValue() + " persistentSize=" + subsCounter.getPersistentSize() 
+ ", result=" + subsCounter.getValue());
                   if (subsCounter.getValue() < 0) {
                      out.println(" #NegativeCounter!!!!");
                   } else {
@@ -232,9 +233,9 @@ public final class DescribeJournal {
 
                   PageSubscriptionCounterImpl subsCounter = 
lookupCounter(counters, queueIDForCounter);
 
-                  subsCounter.loadInc(info.id, encoding.getValue());
+                  subsCounter.loadInc(info.id, encoding.getValue(), 
encoding.getPersistentSize());
                   subsCounter.processReload();
-                  out.print("#Counter queue " + queueIDForCounter + " value=" 
+ subsCounter.getValue() + " increased by " + encoding.getValue());
+                  out.print("#Counter queue " + queueIDForCounter + " value=" 
+ subsCounter.getValue() + " persistentSize=" + subsCounter.getPersistentSize() 
+ " increased by " + encoding.getValue());
                   if (subsCounter.getValue() < 0) {
                      out.println(" #NegativeCounter!!!!");
                   } else {
@@ -321,7 +322,7 @@ public final class DescribeJournal {
 
             subsCounter = lookupCounter(counters, queueIDForCounter);
 
-            subsCounter.loadValue(info.id, encoding.getValue());
+            subsCounter.loadValue(info.id, encoding.getValue(), 
encoding.getPersistentSize());
             subsCounter.processReload();
          } else if (info.getUserRecordType() == 
JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
             PageCountRecordInc encoding = (PageCountRecordInc) o;
@@ -329,7 +330,7 @@ public final class DescribeJournal {
 
             subsCounter = lookupCounter(counters, queueIDForCounter);
 
-            subsCounter.loadInc(info.id, encoding.getValue());
+            subsCounter.loadInc(info.id, encoding.getValue(), 
encoding.getPersistentSize());
             subsCounter.processReload();
          }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 433031c..dabb039 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -19,7 +19,6 @@ package 
org.apache.activemq.artemis.core.persistence.impl.journal;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -35,6 +34,8 @@ import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.jboss.logging.Logger;
 
+import io.netty.buffer.Unpooled;
+
 public final class LargeServerMessageImpl extends CoreMessage implements 
LargeServerMessage {
 
    // Constants -----------------------------------------------------
@@ -345,6 +346,13 @@ public final class LargeServerMessageImpl extends 
CoreMessage implements LargeSe
    }
 
    @Override
+   public long getPersistentSize() throws ActiveMQException {
+      long size = super.getPersistentSize();
+      size += getBodyEncoder().getLargeBodySize();
+
+      return size;
+   }
+   @Override
    public String toString() {
       return "LargeServerMessage[messageID=" + messageID + ",durable=" + 
isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() +
          ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + 
toDate(getExpiration()) +

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java
index 56e8c87..e600d46 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java
@@ -23,9 +23,10 @@ import org.apache.activemq.artemis.utils.DataConstants;
 
 public class PageCountPendingImpl implements EncodingSupport, PageCountPending 
{
 
+
    @Override
    public String toString() {
-      return "PageCountPending [queueID=" + queueID + ", pageID=" + pageID + 
"]";
+      return "PageCountPendingImpl [queueID=" + queueID + ", pageID=" + pageID 
+ "]";
    }
 
    public PageCountPendingImpl() {
@@ -64,7 +65,7 @@ public class PageCountPendingImpl implements EncodingSupport, 
PageCountPending {
 
    @Override
    public int getEncodeSize() {
-      return DataConstants.SIZE_LONG * 2;
+      return DataConstants.SIZE_LONG * 3;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java
index 642feb2..af9e135 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java
@@ -26,18 +26,21 @@ public class PageCountRecord implements EncodingSupport {
 
    private long value;
 
+   private long persistentSize;
+
    @Override
    public String toString() {
-      return "PageCountRecord [queueID=" + queueID + ", value=" + value + "]";
+      return "PageCountRecord [queueID=" + queueID + ", value=" + value + ", 
persistentSize=" + persistentSize + "]";
    }
 
    public PageCountRecord() {
 
    }
 
-   public PageCountRecord(long queueID, long value) {
+   public PageCountRecord(long queueID, long value, long persistentSize) {
       this.queueID = queueID;
       this.value = value;
+      this.persistentSize = persistentSize;
    }
 
    public long getQueueID() {
@@ -48,21 +51,30 @@ public class PageCountRecord implements EncodingSupport {
       return value;
    }
 
+   public long getPersistentSize() {
+      return persistentSize;
+   }
+
    @Override
    public int getEncodeSize() {
-      return DataConstants.SIZE_LONG * 2;
+      return DataConstants.SIZE_LONG * 3;
    }
 
    @Override
    public void encode(ActiveMQBuffer buffer) {
       buffer.writeLong(queueID);
       buffer.writeLong(value);
+      buffer.writeLong(persistentSize);
    }
 
    @Override
    public void decode(ActiveMQBuffer buffer) {
       queueID = buffer.readLong();
       value = buffer.readLong();
+
+      if (buffer.readableBytes() > 0) {
+         persistentSize = buffer.readLong();
+      }
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java
index e427d68..c174155 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java
@@ -26,17 +26,20 @@ public class PageCountRecordInc implements EncodingSupport {
 
    private int value;
 
+   private long persistentSize;
+
    @Override
    public String toString() {
-      return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + 
"]";
+      return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + 
", persistentSize=" + persistentSize + "]";
    }
 
    public PageCountRecordInc() {
    }
 
-   public PageCountRecordInc(long queueID, int value) {
+   public PageCountRecordInc(long queueID, int value, long persistentSize) {
       this.queueID = queueID;
       this.value = value;
+      this.persistentSize = persistentSize;
    }
 
    public long getQueueID() {
@@ -47,21 +50,30 @@ public class PageCountRecordInc implements EncodingSupport {
       return value;
    }
 
+   public long getPersistentSize() {
+      return persistentSize;
+   }
+
    @Override
    public int getEncodeSize() {
-      return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+      return 2 * DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
    }
 
    @Override
    public void encode(ActiveMQBuffer buffer) {
       buffer.writeLong(queueID);
       buffer.writeInt(value);
+      buffer.writeLong(persistentSize);
    }
 
    @Override
    public void decode(ActiveMQBuffer buffer) {
       queueID = buffer.readLong();
       value = buffer.readInt();
+
+      if (buffer.readableBytes() > 0) {
+         persistentSize = buffer.readLong();
+      }
    }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index 32f9010..8c5e11c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -467,7 +467,7 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
-   public long storePageCounter(final long txID, final long queueID, final 
long value) throws Exception {
+   public long storePageCounter(final long txID, final long queueID, final 
long value, final long size) throws Exception {
       return 0;
    }
 
@@ -489,12 +489,12 @@ public class NullStorageManager implements StorageManager 
{
    }
 
    @Override
-   public long storePageCounterInc(final long txID, final long queueID, final 
int add) throws Exception {
+   public long storePageCounterInc(final long txID, final long queueID, final 
int add, final long size) throws Exception {
       return 0;
    }
 
    @Override
-   public long storePageCounterInc(final long queueID, final int add) throws 
Exception {
+   public long storePageCounterInc(final long queueID, final int add, final 
long size) throws Exception {
       return 0;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 12c722a..73d6953 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1480,7 +1480,7 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
       public void afterPrepare(final Transaction tx) {
          for (MessageReference ref : refs) {
             if (ref.isAlreadyAcked()) {
-               ref.getQueue().referenceHandled();
+               ref.getQueue().referenceHandled(ref);
                ref.getQueue().incrementMesssagesAdded();
             }
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 7ae1ee4..10c827e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1911,4 +1911,8 @@ public interface ActiveMQServerLogger extends BasicLogger 
{
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224088, value = "Timeout ({0} seconds) while handshaking has 
occurred.", format = Message.Format.MESSAGE_FORMAT)
    void handshakeTimeout(int timeout);
+
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 224089, value = "Failed to calculate persistent size", format 
= Message.Format.MESSAGE_FORMAT)
+   void errorCalculatePersistentSize(@Cause Throwable e);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 906ea7e..d9145b1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.server;
 
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
@@ -99,4 +100,14 @@ public interface MessageReference {
    void setAlreadyAcked();
 
    boolean isAlreadyAcked();
+
+   /**
+    * This is the size of the message when persisted on disk which is used for 
metrics tracking
+    * Note that even if the message itself is not persisted on disk (ie 
non-durable) this value is
+    * still used for metrics tracking for the amount of data on a queue
+    *
+    * @return
+    * @throws ActiveMQException
+    */
+   long getPersistentSize() throws ActiveMQException;
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index ff4e82b..c355dbf 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -138,12 +138,42 @@ public interface Queue extends Bindable,CriticalComponent 
{
 
    long getMessageCount();
 
+   /**
+    * This is the size of the messages in the queue when persisted on disk 
which is used for metrics tracking
+    * to give an idea of the amount of data on the queue to be consumed
+    *
+    * Note that this includes all messages on the queue, even messages that 
are non-durable which may only be in memory
+    */
+   long getPersistentSize();
+
+   /**
+    * This is the number of the durable messages in the queue
+    */
+   long getDurableMessageCount();
+
+   /**
+    * This is the persistent size of all the durable messages in the queue
+    */
+   long getDurablePersistentSize();
+
    int getDeliveringCount();
 
-   void referenceHandled();
+   long getDeliveringSize();
+
+   int getDurableDeliveringCount();
+
+   long getDurableDeliveringSize();
+
+   void referenceHandled(MessageReference ref);
 
    int getScheduledCount();
 
+   long getScheduledSize();
+
+   int getDurableScheduledCount();
+
+   long getDurableScheduledSize();
+
    List<MessageReference> getScheduledMessages();
 
    /**
@@ -314,8 +344,6 @@ public interface Queue extends Bindable,CriticalComponent {
     */
    SimpleString getUser();
 
-   void decDelivering(int size);
-
    /** This is to perform a check on the counter again */
    void recheckRefCount(OperationContext context);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
index 62fad5e..1dc2eda 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java
@@ -27,6 +27,12 @@ public interface ScheduledDeliveryHandler {
 
    int getScheduledCount();
 
+   long getScheduledSize();
+
+   int getDurableScheduledCount();
+
+   long getDurableScheduledSize();
+
    List<MessageReference> getScheduledReferences();
 
    List<MessageReference> cancel(Filter filter) throws ActiveMQException;

Reply via email to