ARTEMIS-1663 - Add new message count and size metrics Adding new metrics for tracking message counts and sizes on a Queue. This includes tracking metrics for pending, delivering and scheduled messages. The paging store also tracks message size now.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ea70af15 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ea70af15 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ea70af15 Branch: refs/heads/master Commit: ea70af15a3fbfc1b6ac86589e1e2e04a79ca3e23 Parents: 2eac195 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Mon Feb 5 13:24:31 2018 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Feb 8 11:35:12 2018 -0500 ---------------------------------------------------------------------- .../activemq/artemis/api/core/Message.java | 11 +- .../api/core/management/QueueControl.java | 58 ++ .../artemis/core/message/impl/CoreMessage.java | 5 + .../core/message/impl/MessageInternalImpl.java | 5 + .../artemis/core/journal/impl/JournalImpl.java | 6 +- .../protocol/amqp/broker/AMQPMessage.java | 13 +- .../core/protocol/openwire/OpenwireMessage.java | 9 +- .../core/management/impl/QueueControlImpl.java | 110 +++- .../artemis/core/paging/PagedMessage.java | 10 + .../core/paging/cursor/PagePosition.java | 4 + .../core/paging/cursor/PageSubscription.java | 9 + .../paging/cursor/PageSubscriptionCounter.java | 16 +- .../core/paging/cursor/PagedReferenceImpl.java | 19 +- .../paging/cursor/impl/PagePositionImpl.java | 22 + .../impl/PageSubscriptionCounterImpl.java | 146 ++++- .../cursor/impl/PageSubscriptionImpl.java | 69 +- .../core/paging/impl/PagedMessageImpl.java | 6 + .../core/paging/impl/PagingStoreImpl.java | 11 +- .../core/persistence/StorageManager.java | 6 +- .../journal/AbstractJournalStorageManager.java | 37 +- .../impl/journal/DescribeJournal.java | 61 +- .../impl/journal/LargeServerMessageImpl.java | 10 +- .../journal/codec/PageCountPendingImpl.java | 5 +- .../impl/journal/codec/PageCountRecord.java | 18 +- .../impl/journal/codec/PageCountRecordInc.java | 18 +- .../impl/nullpm/NullStorageManager.java | 6 +- .../core/postoffice/impl/PostOfficeImpl.java | 2 +- .../core/server/ActiveMQServerLogger.java | 4 + .../artemis/core/server/MessageReference.java | 11 + .../activemq/artemis/core/server/Queue.java | 34 +- .../core/server/ScheduledDeliveryHandler.java | 6 + .../core/server/cluster/impl/BridgeImpl.java | 2 +- .../core/server/impl/LastValueQueue.java | 10 +- .../core/server/impl/MessageReferenceImpl.java | 8 +- .../server/impl/PostOfficeJournalLoader.java | 13 +- .../artemis/core/server/impl/QueueImpl.java | 144 +++- .../server/impl/QueuePendingMessageMetrics.java | 147 +++++ .../impl/ScheduledDeliveryHandlerImpl.java | 29 +- .../impl/ScheduledDeliveryHandlerTest.java | 75 ++- .../transaction/impl/TransactionImplTest.java | 6 +- .../main/resources/metrics/queueMetrics.groovy | 37 ++ .../main/resources/servers/artemisServer.groovy | 6 + .../compatibility/JournalCompatibilityTest.java | 43 +- .../tests/compatibility/VersionedBaseTest.java | 7 +- .../integration/client/AcknowledgeTest.java | 5 + .../integration/client/SendAckFailTest.java | 12 +- .../management/ManagementTestBase.java | 15 + .../management/QueueControlTest.java | 452 +++++++------ .../management/QueueControlUsingCoreTest.java | 55 +- .../integration/paging/PagingCounterTest.java | 21 +- .../AbstractPersistentStatTestSupport.java | 213 ++++++ .../metrics/JournalPageCountSizeTest.java | 144 ++++ .../metrics/JournalPendingMessageTest.java | 651 +++++++++++++++++++ .../unit/core/postoffice/impl/FakeQueue.java | 47 +- .../core/server/impl/fakes/FakeConsumer.java | 4 +- 55 files changed, 2502 insertions(+), 391 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index d24cd95..031c426 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -657,6 +657,15 @@ public interface Message { int getMemoryEstimate(); - + /** + * This is the size of the message when persisted on disk which is used for metrics tracking + * Note that even if the message itself is not persisted on disk (ie non-durable) this value is + * still used for metrics tracking + * If a normal message it will be the encoded message size + * If a large message it will be encoded message size + large message body size + * @return + * @throws ActiveMQException + */ + long getPersistentSize() throws ActiveMQException; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 447417f..2578684 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -81,12 +81,52 @@ public interface QueueControl { long getMessageCount(); /** + * Returns the persistent size of all messages currently in this queue. The persistent size of a message + * is the amount of space the message would take up on disk which is used to track how much data there + * is to consume on this queue + */ + @Attribute(desc = "persistent size of all messages (including durable and non-durable) currently in this queue (includes scheduled, paged, and in-delivery messages)") + long getPersistentSize(); + + /** + * Returns the number of durable messages currently in this queue. + */ + @Attribute(desc = "number of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)") + long getDurableMessageCount(); + + /** + * Returns the persistent size of durable messages currently in this queue. The persistent size of a message + * is the amount of space the message would take up on disk which is used to track how much data there + * is to consume on this queue + */ + @Attribute(desc = "persistent size of durable messages currently in this queue (includes scheduled, paged, and in-delivery messages)") + long getDurablePersistentSize(); + + /** * Returns the number of scheduled messages in this queue. */ @Attribute(desc = "number of scheduled messages in this queue") long getScheduledCount(); /** + * Returns the size of scheduled messages in this queue. + */ + @Attribute(desc = "persistent size of scheduled messages in this queue") + long getScheduledSize(); + + /** + * Returns the number of durable scheduled messages in this queue. + */ + @Attribute(desc = "number of durable scheduled messages in this queue") + long getDurableScheduledCount(); + + /** + * Returns the size of durable scheduled messages in this queue. + */ + @Attribute(desc = "persistent size of durable scheduled messages in this queue") + long getDurableScheduledSize(); + + /** * Returns the number of consumers consuming messages from this queue. */ @Attribute(desc = "number of consumers consuming messages from this queue") @@ -99,6 +139,24 @@ public interface QueueControl { int getDeliveringCount(); /** + * Returns the persistent size of messages that this queue is currently delivering to its consumers. + */ + @Attribute(desc = "persistent size of messages that this queue is currently delivering to its consumers") + long getDeliveringSize(); + + /** + * Returns the number of durable messages that this queue is currently delivering to its consumers. + */ + @Attribute(desc = "number of durable messages that this queue is currently delivering to its consumers") + int getDurableDeliveringCount(); + + /** + * Returns the size of durable messages that this queue is currently delivering to its consumers. + */ + @Attribute(desc = "persistent size of durable messages that this queue is currently delivering to its consumers") + long getDurableDeliveringSize(); + + /** * Returns the number of messages added to this queue since it was created. */ @Attribute(desc = "number of messages added to this queue since it was created") http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 0fb7c3e..172cc18 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -1150,4 +1150,9 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { private SimpleString.StringSimpleStringPool getPropertyValuesPool() { return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool(); } + + @Override + public long getPersistentSize() throws ActiveMQException { + return getEncodeSize(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java index 56ff816..17cb828 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java @@ -695,4 +695,9 @@ public class MessageInternalImpl implements MessageInternal { return new TypedProperties(message.getTypedProperties()); } + @Override + public long getPersistentSize() throws ActiveMQException { + return message.getPersistentSize(); + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java ---------------------------------------------------------------------- diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 88204d4..34ee72e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -191,7 +191,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal private Executor appendExecutor = null; - private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>(); + private final ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>(); private final ExecutorFactory providedIOThreadPool; protected ExecutorFactory ioExecutorFactory; @@ -2413,7 +2413,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final List<JournalFile> newFiles, final List<Pair<String, String>> renames) throws Exception { - return JournalCompactor.writeControlFile(fileFactory, files, newFiles, renames); + return AbstractJournalUpdateTask.writeControlFile(fileFactory, files, newFiles, renames); } @@ -2763,7 +2763,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal ArrayList<String> newFiles = new ArrayList<>(); ArrayList<Pair<String, String>> renames = new ArrayList<>(); - SequentialFile controlFile = JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles, renames); + SequentialFile controlFile = AbstractJournalUpdateTask.readControlFile(fileFactory, dataFiles, newFiles, renames); if (controlFile != null) { for (String dataFile : dataFiles) { SequentialFile file = fileFactory.createSequentialFile(dataFile); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index cdab412..2d72cf9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -24,10 +24,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.RefCountMessage; @@ -60,6 +58,10 @@ import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; + // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format public class AMQPMessage extends RefCountMessage { @@ -1179,4 +1181,9 @@ public class AMQPMessage extends RefCountMessage { private SimpleString.StringSimpleStringPool getPropertyValuesPool() { return coreMessageObjectPools == null ? null : coreMessageObjectPools.getPropertiesStringSimpleStringPools().getPropertyValuesPool(); } + + @Override + public long getPersistentSize() throws ActiveMQException { + return getEncodeSize(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java index c63fe19..45e8953 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java @@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.protocol.openwire; import java.util.Set; -import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; @@ -29,6 +29,8 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; +import io.netty.buffer.ByteBuf; + // TODO: Implement this public class OpenwireMessage implements Message { @@ -496,4 +498,9 @@ public class OpenwireMessage implements Message { public int getMemoryEstimate() { return 0; } + + @Override + public long getPersistentSize() throws ActiveMQException { + return 0; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index cefcbf9..e678ab8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -227,6 +227,42 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } @Override + public long getPersistentSize() { + checkStarted(); + + clearIO(); + try { + return queue.getPersistentSize(); + } finally { + blockOnIO(); + } + } + + @Override + public long getDurableMessageCount() { + checkStarted(); + + clearIO(); + try { + return queue.getDurableMessageCount(); + } finally { + blockOnIO(); + } + } + + @Override + public long getDurablePersistentSize() { + checkStarted(); + + clearIO(); + try { + return queue.getDurablePersistentSize(); + } finally { + blockOnIO(); + } + } + + @Override public int getConsumerCount() { checkStarted(); @@ -251,6 +287,42 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } @Override + public long getDeliveringSize() { + checkStarted(); + + clearIO(); + try { + return queue.getDeliveringSize(); + } finally { + blockOnIO(); + } + } + + @Override + public int getDurableDeliveringCount() { + checkStarted(); + + clearIO(); + try { + return queue.getDurableDeliveringCount(); + } finally { + blockOnIO(); + } + } + + @Override + public long getDurableDeliveringSize() { + checkStarted(); + + clearIO(); + try { + return queue.getDurableDeliveringSize(); + } finally { + blockOnIO(); + } + } + + @Override public long getMessagesAdded() { checkStarted(); @@ -323,6 +395,42 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { } @Override + public long getScheduledSize() { + checkStarted(); + + clearIO(); + try { + return queue.getScheduledSize(); + } finally { + blockOnIO(); + } + } + + @Override + public long getDurableScheduledCount() { + checkStarted(); + + clearIO(); + try { + return queue.getDurableScheduledCount(); + } finally { + blockOnIO(); + } + } + + @Override + public long getDurableScheduledSize() { + checkStarted(); + + clearIO(); + try { + return queue.getDurableScheduledSize(); + } finally { + blockOnIO(); + } + } + + @Override public String getDeadLetterAddress() { checkStarted(); @@ -998,7 +1106,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { try { long index = 0; long start = (page - 1) * pageSize; - long end = Math.min((long)(page * pageSize), queue.getMessageCount()); + long end = Math.min(page * pageSize, queue.getMessageCount()); ArrayList<CompositeData> c = new ArrayList<>(); Filter thefilter = FilterImpl.createFilter(filter); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java index 0124f09..5b39691 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagedMessage.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.paging; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -38,4 +39,13 @@ public interface PagedMessage extends EncodingSupport { void initMessage(StorageManager storageManager); long getTransactionID(); + + /** + * This is the size of the message when persisted on disk and is used for metrics tracking + * If a normal message it will be the encoded message size + * If a large message it will be encoded message size + large message body size + * @return + * @throws ActiveMQException + */ + long getPersistentSize() throws ActiveMQException; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java index 00955b7..a9e0537 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagePosition.java @@ -28,6 +28,10 @@ public interface PagePosition extends Comparable<PagePosition> { int getMessageNr(); + long getPersistentSize(); + + void setPersistentSize(long persistentSize); + PagePosition nextMessage(); PagePosition nextPage(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java index 985f563..b11362d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java @@ -44,6 +44,8 @@ public interface PageSubscription { long getMessageCount(); + long getPersistentSize(); + long getId(); boolean isPersistent(); @@ -161,4 +163,11 @@ public interface PageSubscription { * @throws Exception */ void onDeletePage(Page deletedPage) throws Exception; + + long getDeliveredCount(); + + long getDeliveredSize(); + + void incrementDeliveredSize(long size); + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java index 37cdb3b..33b744f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscriptionCounter.java @@ -26,13 +26,17 @@ public interface PageSubscriptionCounter { long getValue(); - void increment(Transaction tx, int add) throws Exception; + long getPersistentSizeAdded(); - void loadValue(long recordValueID, long value); + long getPersistentSize(); - void loadInc(long recordInd, int add); + void increment(Transaction tx, int add, long persistentSize) throws Exception; - void applyIncrementOnTX(Transaction tx, long recordID, int add); + void loadValue(long recordValueID, long value, long persistentSize); + + void loadInc(long recordInd, int add, long persistentSize); + + void applyIncrementOnTX(Transaction tx, long recordID, int add, long persistentSize); /** * This will process the reload @@ -43,12 +47,12 @@ public interface PageSubscriptionCounter { * @param id * @param variance */ - void addInc(long id, int variance); + void addInc(long id, int variance, long size); // used when deleting the counter void delete() throws Exception; - void pendingCounter(Page page, int increment) throws Exception; + void pendingCounter(Page page, int increment, long persistentSize) throws Exception; // used when leaving page mode, so the counters are deleted in batches // for each queue on the address http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 42c5423..f5d49cf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -59,6 +59,8 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> private final long messageID; + private long messageSize = -1; + @Override public Object getProtocolData() { return protocolData; @@ -104,6 +106,9 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> this.largeMessage = message.getMessage().isLargeMessage(); this.transactionID = message.getTransactionID(); this.messageID = message.getMessage().getMessageID(); + + //pre-cache the message size so we don't have to reload the message later if it is GC'd + getPersistentSize(); } @Override @@ -191,7 +196,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> @Override public void handled() { - getQueue().referenceHandled(); + getQueue().referenceHandled(this); } @Override @@ -280,4 +285,16 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl> return messageID; } + @Override + public long getPersistentSize() { + if (messageSize == -1) { + try { + messageSize = getPagedMessage().getPersistentSize(); + } catch (Throwable e) { + ActiveMQServerLogger.LOGGER.errorCalculatePersistentSize(e); + } + } + return messageSize; + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java index 076f872..52d1c83 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PagePositionImpl.java @@ -37,6 +37,12 @@ public class PagePositionImpl implements PagePosition { private long recordID = -1; /** + * Optional size value that can be set to specify the peristent size of the message + * for metrics tracking purposes + */ + private long persistentSize; + + /** * @param pageNr * @param messageNr */ @@ -82,6 +88,22 @@ public class PagePositionImpl implements PagePosition { return messageNr; } + /** + * @return the persistentSize + */ + @Override + public long getPersistentSize() { + return persistentSize; + } + + /** + * @param persistentSize the persistentSize to set + */ + @Override + public void setPersistentSize(long persistentSize) { + this.persistentSize = persistentSize; + } + @Override public int compareTo(PagePosition o) { if (pageNr > o.getPageNr()) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java index 01ad778..3bb56f8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java @@ -21,10 +21,10 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.paging.impl.Page; @@ -60,10 +60,13 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { private final Executor executor; private final AtomicLong value = new AtomicLong(0); + private final AtomicLong persistentSize = new AtomicLong(0); private final AtomicLong added = new AtomicLong(0); + private final AtomicLong addedPersistentSize = new AtomicLong(0); private final AtomicLong pendingValue = new AtomicLong(0); + private final AtomicLong pendingPersistentSize = new AtomicLong(0); private final LinkedList<Long> incrementRecords = new LinkedList<>(); @@ -71,9 +74,9 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { // we will recount a page case we still see pending records // as soon as we close a page we remove these records replacing by a regular page increment record // A Map per pageID, each page will have a set of IDs, with the increment on each one - private final Map<Long, Pair<Long, AtomicInteger>> pendingCounters = new HashMap<>(); + private final Map<Long, PendingCounter> pendingCounters = new HashMap<>(); - private LinkedList<Pair<Long, Integer>> loadList; + private LinkedList<PendingCounter> loadList; private final Runnable cleanupCheck = new Runnable() { @Override @@ -104,6 +107,16 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { return value.get() + pendingValue.get(); } + @Override + public long getPersistentSizeAdded() { + return addedPersistentSize.get() + pendingPersistentSize.get(); + } + + @Override + public long getPersistentSize() { + return persistentSize.get() + pendingPersistentSize.get(); + } + /** * This is used only on non transactional paging * @@ -112,24 +125,25 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { * @throws Exception */ @Override - public synchronized void pendingCounter(Page page, int increment) throws Exception { + public synchronized void pendingCounter(Page page, int increment, long size) throws Exception { if (!persistent) { return; // nothing to be done } - Pair<Long, AtomicInteger> pendingInfo = pendingCounters.get((long) page.getPageId()); + PendingCounter pendingInfo = pendingCounters.get((long) page.getPageId()); if (pendingInfo == null) { // We have to make sure this is sync here // not syncing this to disk may cause the page files to be out of sync on pages. // we can't afford the case where a page file is written without a record here long id = storage.storePendingCounter(this.subscriptionID, page.getPageId(), increment); - pendingInfo = new Pair<>(id, new AtomicInteger(1)); + pendingInfo = new PendingCounter(id, increment, size); pendingCounters.put((long) page.getPageId(), pendingInfo); } else { - pendingInfo.getB().addAndGet(increment); + pendingInfo.addAndGet(increment, size); } pendingValue.addAndGet(increment); + pendingPersistentSize.addAndGet(size); page.addPendingCounter(this); } @@ -141,23 +155,25 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { */ @Override public void cleanupNonTXCounters(final long pageID) throws Exception { - Pair<Long, AtomicInteger> pendingInfo; + PendingCounter pendingInfo; synchronized (this) { pendingInfo = pendingCounters.remove(pageID); } if (pendingInfo != null) { - final AtomicInteger valueCleaned = pendingInfo.getB(); + final int valueCleaned = pendingInfo.getCount(); + final long valueSizeCleaned = pendingInfo.getPersistentSize(); Transaction tx = new TransactionImpl(storage); - storage.deletePendingPageCounter(tx.getID(), pendingInfo.getA()); + storage.deletePendingPageCounter(tx.getID(), pendingInfo.getId()); // To apply the increment of the value just being cleaned - increment(tx, valueCleaned.get()); + increment(tx, valueCleaned, valueSizeCleaned); tx.addOperation(new TransactionOperationAbstract() { @Override public void afterCommit(Transaction tx) { - pendingValue.addAndGet(-valueCleaned.get()); + pendingValue.addAndGet(-valueCleaned); + pendingPersistentSize.updateAndGet(val -> val >= valueSizeCleaned ? val - valueSizeCleaned : 0); } }); @@ -166,21 +182,21 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { } @Override - public void increment(Transaction tx, int add) throws Exception { + public void increment(Transaction tx, int add, long size) throws Exception { if (tx == null) { if (persistent) { - long id = storage.storePageCounterInc(this.subscriptionID, add); - incrementProcessed(id, add); + long id = storage.storePageCounterInc(this.subscriptionID, add, size); + incrementProcessed(id, add, size); } else { - incrementProcessed(-1, add); + incrementProcessed(-1, add, size); } } else { if (persistent) { tx.setContainsPersistent(); - long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add); - applyIncrementOnTX(tx, id, add); + long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add, size); + applyIncrementOnTX(tx, id, add, size); } else { - applyIncrementOnTX(tx, -1, add); + applyIncrementOnTX(tx, -1, add, size); } } } @@ -193,7 +209,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { * @param add */ @Override - public void applyIncrementOnTX(Transaction tx, long recordID1, int add) { + public void applyIncrementOnTX(Transaction tx, long recordID1, int add, long size) { CounterOperations oper = (CounterOperations) tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC); if (oper == null) { @@ -202,22 +218,24 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { tx.addOperation(oper); } - oper.operations.add(new ItemOper(this, recordID1, add)); + oper.operations.add(new ItemOper(this, recordID1, add, size)); } @Override - public synchronized void loadValue(final long recordID1, final long value1) { + public synchronized void loadValue(final long recordID1, final long value1, long size) { if (this.subscription != null) { // it could be null on testcases... which is ok this.subscription.notEmpty(); } this.value.set(value1); this.added.set(value1); + this.persistentSize.set(size); + this.addedPersistentSize.set(size); this.recordID = recordID1; } - public synchronized void incrementProcessed(long id, int add) { - addInc(id, add); + public synchronized void incrementProcessed(long id, int add, long size) { + addInc(id, add, size); if (incrementRecords.size() > FLUSH_COUNTER) { executor.execute(cleanupCheck); } @@ -259,12 +277,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { } @Override - public void loadInc(long id, int add) { + public void loadInc(long id, int add, long size) { if (loadList == null) { loadList = new LinkedList<>(); } - loadList.add(new Pair<>(id, add)); + loadList.add(new PendingCounter(id, add, size)); } @Override @@ -275,10 +293,12 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { subscription.notEmpty(); } - for (Pair<Long, Integer> incElement : loadList) { - value.addAndGet(incElement.getB()); - added.addAndGet(incElement.getB()); - incrementRecords.add(incElement.getA()); + for (PendingCounter incElement : loadList) { + value.addAndGet(incElement.getCount()); + added.addAndGet(incElement.getCount()); + persistentSize.addAndGet(incElement.getPersistentSize()); + addedPersistentSize.addAndGet(incElement.getPersistentSize()); + incrementRecords.add(incElement.getId()); } loadList.clear(); loadList = null; @@ -286,11 +306,15 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { } @Override - public synchronized void addInc(long id, int variance) { + public synchronized void addInc(long id, int variance, long size) { value.addAndGet(variance); + this.persistentSize.addAndGet(size); if (variance > 0) { added.addAndGet(variance); } + if (size > 0) { + addedPersistentSize.addAndGet(size); + } if (id >= 0) { incrementRecords.add(id); } @@ -310,11 +334,13 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { ArrayList<Long> deleteList; long valueReplace; + long sizeReplace; synchronized (this) { if (incrementRecords.size() <= FLUSH_COUNTER) { return; } valueReplace = value.get(); + sizeReplace = persistentSize.get(); deleteList = new ArrayList<>(incrementRecords); incrementRecords.clear(); } @@ -332,7 +358,7 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { storage.deletePageCounter(txCleanup, recordID); } - newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace); + newRecordID = storage.storePageCounter(txCleanup, subscriptionID, valueReplace, sizeReplace); if (logger.isTraceEnabled()) { logger.trace("Replacing page-counter record = " + recordID + " by record = " + newRecordID + " on subscriptionID = " + this.subscriptionID + " for queue = " + this.subscription.getQueue().getName()); @@ -354,10 +380,11 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { private static class ItemOper { - private ItemOper(PageSubscriptionCounterImpl counter, long id, int add) { + private ItemOper(PageSubscriptionCounterImpl counter, long id, int add, long persistentSize) { this.counter = counter; this.id = id; this.amount = add; + this.persistentSize = persistentSize; } PageSubscriptionCounterImpl counter; @@ -365,6 +392,8 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { long id; int amount; + + long persistentSize; } private static class CounterOperations extends TransactionOperationAbstract implements TransactionOperation { @@ -374,8 +403,55 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter { @Override public void afterCommit(Transaction tx) { for (ItemOper oper : operations) { - oper.counter.incrementProcessed(oper.id, oper.amount); + oper.counter.incrementProcessed(oper.id, oper.amount, oper.persistentSize); } } } + + private static class PendingCounter { + private static final AtomicIntegerFieldUpdater<PendingCounter> COUNT_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(PendingCounter.class, "count"); + + private static final AtomicLongFieldUpdater<PendingCounter> SIZE_UPDATER = + AtomicLongFieldUpdater.newUpdater(PendingCounter.class, "persistentSize"); + + private final long id; + private volatile int count; + private volatile long persistentSize; + + /** + * @param id + * @param count + * @param size + */ + PendingCounter(long id, int count, long persistentSize) { + super(); + this.id = id; + this.count = count; + this.persistentSize = persistentSize; + } + /** + * @return the id + */ + public long getId() { + return id; + } + /** + * @return the count + */ + public int getCount() { + return count; + } + /** + * @return the size + */ + public long getPersistentSize() { + return persistentSize; + } + + public void addAndGet(int count, long persistentSize) { + COUNT_UPDATER.addAndGet(this, count); + SIZE_UPDATER.addAndGet(this, persistentSize); + } + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 24c69be..924aace 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -96,6 +96,8 @@ final class PageSubscriptionImpl implements PageSubscription { private final AtomicLong deliveredCount = new AtomicLong(0); + private final AtomicLong deliveredSize = new AtomicLong(0); + PageSubscriptionImpl(final PageCursorProvider cursorProvider, final PagingStore pageStore, final StorageManager store, @@ -178,6 +180,18 @@ final class PageSubscriptionImpl implements PageSubscription { } @Override + public long getPersistentSize() { + if (empty) { + return 0; + } else { + //A negative value could happen if an old journal was loaded that didn't have + //size metrics for old records + long messageSize = counter.getPersistentSize() - deliveredSize.get(); + return messageSize > 0 ? messageSize : 0; + } + } + + @Override public PageSubscriptionCounter getCounter() { return counter; } @@ -439,7 +453,7 @@ final class PageSubscriptionImpl implements PageSubscription { public void ackTx(final Transaction tx, final PagedReference reference) throws Exception { confirmPosition(tx, reference.getPosition()); - counter.increment(tx, -1); + counter.increment(tx, -1, -getPersistentSize(reference)); PageTransactionInfo txInfo = getPageTransaction(reference); if (txInfo != null) { @@ -831,6 +845,12 @@ final class PageSubscriptionImpl implements PageSubscription { } PageCursorInfo info = getPageInfo(position); + PageCache cache = info.getCache(); + long size = 0; + if (cache != null) { + size = getPersistentSize(cache.getMessage(position.getMessageNr())); + position.setPersistentSize(size); + } logger.tracef("InstallTXCallback looking up pagePosition %s, result=%s", position, info); @@ -1060,6 +1080,13 @@ final class PageSubscriptionImpl implements PageSubscription { } } + /** + * @return the cache + */ + public PageCache getCache() { + return cache != null ? cache.get() : null; + } + } private final class PageCursorTX extends TransactionOperationAbstract { @@ -1087,6 +1114,7 @@ final class PageSubscriptionImpl implements PageSubscription { for (PagePosition confirmed : positions) { cursor.processACK(confirmed); cursor.deliveredCount.decrementAndGet(); + cursor.deliveredSize.addAndGet(-confirmed.getPersistentSize()); } } @@ -1309,4 +1337,43 @@ final class PageSubscriptionImpl implements PageSubscription { public void close() { } } + + /** + * @return the deliveredCount + */ + @Override + public long getDeliveredCount() { + return deliveredCount.get(); + } + + /** + * @return the deliveredSize + */ + @Override + public long getDeliveredSize() { + return deliveredSize.get(); + } + + @Override + public void incrementDeliveredSize(long size) { + deliveredSize.addAndGet(size); + } + + private long getPersistentSize(PagedMessage msg) { + try { + return msg != null && msg.getPersistentSize() > 0 ? msg.getPersistentSize() : 0; + } catch (ActiveMQException e) { + logger.warn("Error computing persistent size of message: " + msg, e); + return 0; + } + } + + private long getPersistentSize(PagedReference ref) { + try { + return ref != null && ref.getPersistentSize() > 0 ? ref.getPersistentSize() : 0; + } catch (ActiveMQException e) { + logger.warn("Error computing persistent size of message: " + ref, e); + return 0; + } + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java index d7bd05c..3ef833d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java @@ -20,6 +20,7 @@ import java.util.Arrays; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.paging.PagedMessage; @@ -169,4 +170,9 @@ public class PagedMessageImpl implements PagedMessage { message + "]"; } + + @Override + public long getPersistentSize() throws ActiveMQException { + return message.getPersistentSize(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index f1beb31..0eec5a0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -840,7 +840,8 @@ public class PagingStoreImpl implements PagingStore { // the apply counter will make sure we write a record on journal // especially on the case for non transactional sends and paging // doing this will give us a possibility of recovering the page counters - applyPageCounters(tx, getCurrentPage(), listCtx); + long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0; + applyPageCounters(tx, getCurrentPage(), listCtx, persistentSize); currentPage.write(pagedMessage); @@ -906,22 +907,22 @@ public class PagingStoreImpl implements PagingStore { * @param ctx * @throws Exception */ - private void applyPageCounters(Transaction tx, Page page, RouteContextList ctx) throws Exception { + private void applyPageCounters(Transaction tx, Page page, RouteContextList ctx, long size) throws Exception { List<org.apache.activemq.artemis.core.server.Queue> durableQueues = ctx.getDurableQueues(); List<org.apache.activemq.artemis.core.server.Queue> nonDurableQueues = ctx.getNonDurableQueues(); for (org.apache.activemq.artemis.core.server.Queue q : durableQueues) { if (tx == null) { // non transactional writes need an intermediate place // to avoid the counter getting out of sync - q.getPageSubscription().getCounter().pendingCounter(page, 1); + q.getPageSubscription().getCounter().pendingCounter(page, 1, size); } else { // null tx is treated through pending counters - q.getPageSubscription().getCounter().increment(tx, 1); + q.getPageSubscription().getCounter().increment(tx, 1, size); } } for (org.apache.activemq.artemis.core.server.Queue q : nonDurableQueues) { - q.getPageSubscription().getCounter().increment(tx, 1); + q.getPageSubscription().getCounter().increment(tx, 1, size); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 6defb1e..f9793d8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -336,7 +336,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { /** * @return The ID with the stored counter */ - long storePageCounter(long txID, long queueID, long value) throws Exception; + long storePageCounter(long txID, long queueID, long value, long persistentSize) throws Exception; long storePendingCounter(long queueID, long pageID, int inc) throws Exception; @@ -350,13 +350,13 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { * @return the ID with the increment record * @throws Exception */ - long storePageCounterInc(long txID, long queueID, int add) throws Exception; + long storePageCounterInc(long txID, long queueID, int add, long persistentSize) throws Exception; /** * @return the ID with the increment record * @throws Exception */ - long storePageCounterInc(long queueID, int add) throws Exception; + long storePageCounterInc(long queueID, int add, long size) throws Exception; /** * @return the bindings journal http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 34d249e..ada5b90 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -16,7 +16,13 @@ */ package org.apache.activemq.artemis.core.persistence.impl.journal; -import javax.transaction.xa.Xid; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME; + import java.io.File; import java.io.FileInputStream; import java.security.DigestInputStream; @@ -37,6 +43,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.transaction.xa.Xid; + import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Message; @@ -109,13 +117,6 @@ import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl; import org.jboss.logging.Logger; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME; - /** * Controls access to the journals and other storage files such as the ones used to store pages and * large messages. This class must control writing of any non-transient data, as it is the key point @@ -1084,7 +1085,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager); if (sub != null) { - sub.getCounter().loadValue(record.id, encoding.getValue()); + sub.getCounter().loadValue(record.id, encoding.getValue(), encoding.getPersistentSize()); } else { ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID()); messageJournal.appendDeleteRecord(record.id, false); @@ -1101,7 +1102,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager); if (sub != null) { - sub.getCounter().loadInc(record.id, encoding.getValue()); + sub.getCounter().loadInc(record.id, encoding.getValue(), encoding.getPersistentSize()); } else { ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID()); messageJournal.appendDeleteRecord(record.id, false); @@ -1136,6 +1137,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp case JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER: { PageCountPendingImpl pendingCountEncoding = new PageCountPendingImpl(); + pendingCountEncoding.decode(buff); pendingCountEncoding.setID(record.id); @@ -1143,6 +1145,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp if (pendingNonTXPageCounter != null) { pendingNonTXPageCounter.add(pendingCountEncoding); } + break; } @@ -1349,11 +1352,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } @Override - public long storePageCounterInc(long txID, long queueID, int value) throws Exception { + public long storePageCounterInc(long txID, long queueID, int value, long persistentSize) throws Exception { readLock(); try { long recordID = idGenerator.generateID(); - messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value)); + messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value, persistentSize)); return recordID; } finally { readUnLock(); @@ -1361,11 +1364,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } @Override - public long storePageCounterInc(long queueID, int value) throws Exception { + public long storePageCounterInc(long queueID, int value, long persistentSize) throws Exception { readLock(); try { final long recordID = idGenerator.generateID(); - messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value), true, getContext()); + messageJournal.appendAddRecord(recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_INC, new PageCountRecordInc(queueID, value, persistentSize), true, getContext()); return recordID; } finally { readUnLock(); @@ -1373,11 +1376,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp } @Override - public long storePageCounter(long txID, long queueID, long value) throws Exception { + public long storePageCounter(long txID, long queueID, long value, long persistentSize) throws Exception { readLock(); try { final long recordID = idGenerator.generateID(); - messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value)); + messageJournal.appendAddRecordTransactional(txID, recordID, JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE, new PageCountRecord(queueID, value, persistentSize)); return recordID; } finally { readUnLock(); @@ -1789,7 +1792,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager); if (sub != null) { - sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue()); + sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue(), encoding.getPersistentSize()); sub.notEmpty(); } else { ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index bfabc25..6cd417b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -16,7 +16,29 @@ */ package org.apache.activemq.artemis.core.persistence.impl.journal; -import javax.transaction.xa.Xid; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_REF; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_BINDING_RECORD; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_SETTING_RECORD; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE_PROTOCOL; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_REF; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.HEURISTIC_COMPLETION; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ID_COUNTER_RECORD; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COMPLETE; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_TRANSACTION; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_BINDING_RECORD; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_STATUS_RECORD; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SECURITY_RECORD; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME; +import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT; + import java.io.File; import java.io.PrintStream; import java.util.HashMap; @@ -24,6 +46,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import javax.transaction.xa.Xid; + import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.Message; @@ -58,29 +82,6 @@ import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.XidCodecSupport; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_REF; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_SETTING_RECORD; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_MESSAGE_PROTOCOL; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_REF; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.HEURISTIC_COMPLETION; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ID_COUNTER_RECORD; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COMPLETE; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_PENDING_COUNTER; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_TRANSACTION; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_BINDING_RECORD; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.QUEUE_STATUS_RECORD; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SECURITY_RECORD; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT; -import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADDRESS_BINDING_RECORD; - /** * Outputs a String description of the Journals contents. @@ -217,9 +218,9 @@ public final class DescribeJournal { out.println("####### Counter replace wrongly on queue " + queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + encoding.getValue()); } - subsCounter.loadValue(info.id, encoding.getValue()); + subsCounter.loadValue(info.id, encoding.getValue(), encoding.getPersistentSize()); subsCounter.processReload(); - out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + ", result=" + subsCounter.getValue()); + out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " persistentSize=" + subsCounter.getPersistentSize() + ", result=" + subsCounter.getValue()); if (subsCounter.getValue() < 0) { out.println(" #NegativeCounter!!!!"); } else { @@ -232,9 +233,9 @@ public final class DescribeJournal { PageSubscriptionCounterImpl subsCounter = lookupCounter(counters, queueIDForCounter); - subsCounter.loadInc(info.id, encoding.getValue()); + subsCounter.loadInc(info.id, encoding.getValue(), encoding.getPersistentSize()); subsCounter.processReload(); - out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " increased by " + encoding.getValue()); + out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " persistentSize=" + subsCounter.getPersistentSize() + " increased by " + encoding.getValue()); if (subsCounter.getValue() < 0) { out.println(" #NegativeCounter!!!!"); } else { @@ -321,7 +322,7 @@ public final class DescribeJournal { subsCounter = lookupCounter(counters, queueIDForCounter); - subsCounter.loadValue(info.id, encoding.getValue()); + subsCounter.loadValue(info.id, encoding.getValue(), encoding.getPersistentSize()); subsCounter.processReload(); } else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) { PageCountRecordInc encoding = (PageCountRecordInc) o; @@ -329,7 +330,7 @@ public final class DescribeJournal { subsCounter = lookupCounter(counters, queueIDForCounter); - subsCounter.loadInc(info.id, encoding.getValue()); + subsCounter.loadInc(info.id, encoding.getValue(), encoding.getPersistentSize()); subsCounter.processReload(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index 433031c..dabb039 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.core.persistence.impl.journal; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; @@ -35,6 +34,8 @@ import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.jboss.logging.Logger; +import io.netty.buffer.Unpooled; + public final class LargeServerMessageImpl extends CoreMessage implements LargeServerMessage { // Constants ----------------------------------------------------- @@ -345,6 +346,13 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe } @Override + public long getPersistentSize() throws ActiveMQException { + long size = super.getPersistentSize(); + size += getBodyEncoder().getLargeBodySize(); + + return size; + } + @Override public String toString() { return "LargeServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java index 56e8c87..e600d46 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountPendingImpl.java @@ -23,9 +23,10 @@ import org.apache.activemq.artemis.utils.DataConstants; public class PageCountPendingImpl implements EncodingSupport, PageCountPending { + @Override public String toString() { - return "PageCountPending [queueID=" + queueID + ", pageID=" + pageID + "]"; + return "PageCountPendingImpl [queueID=" + queueID + ", pageID=" + pageID + "]"; } public PageCountPendingImpl() { @@ -64,7 +65,7 @@ public class PageCountPendingImpl implements EncodingSupport, PageCountPending { @Override public int getEncodeSize() { - return DataConstants.SIZE_LONG * 2; + return DataConstants.SIZE_LONG * 3; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java index 642feb2..af9e135 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecord.java @@ -26,18 +26,21 @@ public class PageCountRecord implements EncodingSupport { private long value; + private long persistentSize; + @Override public String toString() { - return "PageCountRecord [queueID=" + queueID + ", value=" + value + "]"; + return "PageCountRecord [queueID=" + queueID + ", value=" + value + ", persistentSize=" + persistentSize + "]"; } public PageCountRecord() { } - public PageCountRecord(long queueID, long value) { + public PageCountRecord(long queueID, long value, long persistentSize) { this.queueID = queueID; this.value = value; + this.persistentSize = persistentSize; } public long getQueueID() { @@ -48,21 +51,30 @@ public class PageCountRecord implements EncodingSupport { return value; } + public long getPersistentSize() { + return persistentSize; + } + @Override public int getEncodeSize() { - return DataConstants.SIZE_LONG * 2; + return DataConstants.SIZE_LONG * 3; } @Override public void encode(ActiveMQBuffer buffer) { buffer.writeLong(queueID); buffer.writeLong(value); + buffer.writeLong(persistentSize); } @Override public void decode(ActiveMQBuffer buffer) { queueID = buffer.readLong(); value = buffer.readLong(); + + if (buffer.readableBytes() > 0) { + persistentSize = buffer.readLong(); + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java index e427d68..c174155 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PageCountRecordInc.java @@ -26,17 +26,20 @@ public class PageCountRecordInc implements EncodingSupport { private int value; + private long persistentSize; + @Override public String toString() { - return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + "]"; + return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + ", persistentSize=" + persistentSize + "]"; } public PageCountRecordInc() { } - public PageCountRecordInc(long queueID, int value) { + public PageCountRecordInc(long queueID, int value, long persistentSize) { this.queueID = queueID; this.value = value; + this.persistentSize = persistentSize; } public long getQueueID() { @@ -47,21 +50,30 @@ public class PageCountRecordInc implements EncodingSupport { return value; } + public long getPersistentSize() { + return persistentSize; + } + @Override public int getEncodeSize() { - return DataConstants.SIZE_LONG + DataConstants.SIZE_INT; + return 2 * DataConstants.SIZE_LONG + DataConstants.SIZE_INT; } @Override public void encode(ActiveMQBuffer buffer) { buffer.writeLong(queueID); buffer.writeInt(value); + buffer.writeLong(persistentSize); } @Override public void decode(ActiveMQBuffer buffer) { queueID = buffer.readLong(); value = buffer.readInt(); + + if (buffer.readableBytes() > 0) { + persistentSize = buffer.readLong(); + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index 32f9010..8c5e11c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -467,7 +467,7 @@ public class NullStorageManager implements StorageManager { } @Override - public long storePageCounter(final long txID, final long queueID, final long value) throws Exception { + public long storePageCounter(final long txID, final long queueID, final long value, final long size) throws Exception { return 0; } @@ -489,12 +489,12 @@ public class NullStorageManager implements StorageManager { } @Override - public long storePageCounterInc(final long txID, final long queueID, final int add) throws Exception { + public long storePageCounterInc(final long txID, final long queueID, final int add, final long size) throws Exception { return 0; } @Override - public long storePageCounterInc(final long queueID, final int add) throws Exception { + public long storePageCounterInc(final long queueID, final int add, final long size) throws Exception { return 0; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 12c722a..73d6953 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1480,7 +1480,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding public void afterPrepare(final Transaction tx) { for (MessageReference ref : refs) { if (ref.isAlreadyAcked()) { - ref.getQueue().referenceHandled(); + ref.getQueue().referenceHandled(ref); ref.getQueue().incrementMesssagesAdded(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 7ae1ee4..10c827e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1911,4 +1911,8 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 224088, value = "Timeout ({0} seconds) while handshaking has occurred.", format = Message.Format.MESSAGE_FORMAT) void handshakeTimeout(int timeout); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 224089, value = "Failed to calculate persistent size", format = Message.Format.MESSAGE_FORMAT) + void errorCalculatePersistentSize(@Cause Throwable e); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index 906ea7e..d9145b1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.server; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; @@ -99,4 +100,14 @@ public interface MessageReference { void setAlreadyAcked(); boolean isAlreadyAcked(); + + /** + * This is the size of the message when persisted on disk which is used for metrics tracking + * Note that even if the message itself is not persisted on disk (ie non-durable) this value is + * still used for metrics tracking for the amount of data on a queue + * + * @return + * @throws ActiveMQException + */ + long getPersistentSize() throws ActiveMQException; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index ff4e82b..c355dbf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -138,12 +138,42 @@ public interface Queue extends Bindable,CriticalComponent { long getMessageCount(); + /** + * This is the size of the messages in the queue when persisted on disk which is used for metrics tracking + * to give an idea of the amount of data on the queue to be consumed + * + * Note that this includes all messages on the queue, even messages that are non-durable which may only be in memory + */ + long getPersistentSize(); + + /** + * This is the number of the durable messages in the queue + */ + long getDurableMessageCount(); + + /** + * This is the persistent size of all the durable messages in the queue + */ + long getDurablePersistentSize(); + int getDeliveringCount(); - void referenceHandled(); + long getDeliveringSize(); + + int getDurableDeliveringCount(); + + long getDurableDeliveringSize(); + + void referenceHandled(MessageReference ref); int getScheduledCount(); + long getScheduledSize(); + + int getDurableScheduledCount(); + + long getDurableScheduledSize(); + List<MessageReference> getScheduledMessages(); /** @@ -314,8 +344,6 @@ public interface Queue extends Bindable,CriticalComponent { */ SimpleString getUser(); - void decDelivering(int size); - /** This is to perform a check on the counter again */ void recheckRefCount(OperationContext context); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java index 62fad5e..1dc2eda 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ScheduledDeliveryHandler.java @@ -27,6 +27,12 @@ public interface ScheduledDeliveryHandler { int getScheduledCount(); + long getScheduledSize(); + + int getDurableScheduledCount(); + + long getDurableScheduledSize(); + List<MessageReference> getScheduledReferences(); List<MessageReference> cancel(Filter filter) throws ActiveMQException;