http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index d09f62f..2295987 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -25,9 +25,11 @@ import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; +import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage; @@ -81,9 +83,9 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.BindingQueryResult; +import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.jboss.logging.Logger; @@ -137,13 +139,23 @@ public class ServerSessionPacketHandler implements ChannelHandler { private volatile CoreRemotingConnection remotingConnection; + private final CoreProtocolManager manager; + + // The current currentLargeMessage being processed + private volatile LargeServerMessage currentLargeMessage; + private final boolean direct; - public ServerSessionPacketHandler(final ServerSession session, + public ServerSessionPacketHandler(final CoreProtocolManager manager, + final ServerSession session, final StorageManager storageManager, final Channel channel) { + this.manager = manager; + this.session = session; + session.addCloseable((boolean failed) -> clearLargeMessage()); + this.storageManager = storageManager; this.channel = channel; @@ -159,6 +171,16 @@ public class ServerSessionPacketHandler implements ChannelHandler { } } + private void clearLargeMessage() { + if (currentLargeMessage != null) { + try { + currentLargeMessage.deleteFile(); + } catch (Throwable error) { + ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error); + } + } + } + public ServerSession getSession() { return session; } @@ -469,7 +491,8 @@ public class ServerSessionPacketHandler implements ChannelHandler { case SESS_SEND: { SessionSendMessage message = (SessionSendMessage) packet; requiresResponse = message.isRequiresResponse(); - session.send((ServerMessage) message.getMessage(), direct); + message.getMessage().setProtocol(manager); + session.send(message.getMessage(), direct); if (requiresResponse) { response = new NullResponseMessage(); } @@ -477,13 +500,13 @@ public class ServerSessionPacketHandler implements ChannelHandler { } case SESS_SEND_LARGE: { SessionSendLargeMessage message = (SessionSendLargeMessage) packet; - session.sendLarge(message.getLargeMessage()); + sendLarge(message.getLargeMessage()); break; } case SESS_SEND_CONTINUATION: { SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet; requiresResponse = message.isRequiresResponse(); - session.sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues()); + sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues()); if (requiresResponse) { response = new NullResponseMessage(); } @@ -681,4 +704,53 @@ public class ServerSessionPacketHandler implements ChannelHandler { return serverLastReceivedCommandID; } + + // Large Message is part of the core protocol, we have these functions here as part of Packet handler + private void sendLarge(final Message message) throws Exception { + // need to create the LargeMessage before continue + long id = storageManager.generateID(); + + LargeServerMessage largeMsg = storageManager.createLargeMessage(id, message); + + if (logger.isTraceEnabled()) { + logger.trace("sendLarge::" + largeMsg); + } + + if (currentLargeMessage != null) { + ActiveMQServerLogger.LOGGER.replacingIncompleteLargeMessage(currentLargeMessage.getMessageID()); + } + + currentLargeMessage = largeMsg; + } + + + + private void sendContinuations(final int packetSize, + final long messageBodySize, + final byte[] body, + final boolean continues) throws Exception { + if (currentLargeMessage == null) { + throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised(); + } + + // Immediately release the credits for the continuations- these don't contribute to the in-memory size + // of the message + + currentLargeMessage.addBytes(body); + + if (!continues) { + currentLargeMessage.releaseResources(); + + if (messageBodySize >= 0) { + currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize); + } + + + session.doSend(session.getCurrentTransaction(), currentLargeMessage, false, false); + + currentLargeMessage = null; + } + } + + }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index e6595a5..919d84e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -168,7 +168,7 @@ public class ActiveMQPacketHandler implements ChannelHandler { ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext, routingTypeMap); - ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel); + ServerSessionPacketHandler handler = new ServerSessionPacketHandler(protocolManager, session, server.getStorageManager(), channel); channel.setHandler(handler); // TODO - where is this removed? http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java index 7fed534..7560917 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java @@ -21,7 +21,10 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.core.message.impl.CoreMessagePersister; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; @@ -29,10 +32,21 @@ import org.apache.activemq.artemis.utils.uri.BeanSupport; public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> { + public static final byte ID = 1; private static String[] SUPPORTED_PROTOCOLS = {ActiveMQClient.DEFAULT_CORE_PROTOCOL}; private static final String MODULE_NAME = "artemis-server"; + @Override + public byte getStoreID() { + return ID; + } + + @Override + public Persister<Message> getPersister() { + return CoreMessagePersister.getInstance(); + } + /** * {@inheritDoc} * * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index a6c73eb..3a09e91 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Packet; @@ -28,7 +29,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -66,7 +66,7 @@ public final class CoreSessionCallback implements SessionCallback { @Override public int sendLargeMessage(MessageReference ref, - ServerMessage message, + Message message, ServerConsumer consumer, long bodySize, int deliveryCount) { @@ -92,7 +92,9 @@ public final class CoreSessionCallback implements SessionCallback { } @Override - public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) { + public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) { + + // TODO-now: fix this Packet packet = new SessionReceiveMessage(consumer.getID(), message, deliveryCount); int size = 0; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java index 89d2863..8d22fab 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddMessage.java @@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import java.util.Arrays; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE; @@ -36,7 +36,9 @@ public final class ReplicationAddMessage extends PacketImpl { private byte journalRecordType; - private EncodingSupport encodingData; + private Persister persister; + + private Object encodingData; private byte[] recordData; @@ -48,12 +50,14 @@ public final class ReplicationAddMessage extends PacketImpl { final ADD_OPERATION_TYPE operation, final long id, final byte journalRecordType, - final EncodingSupport encodingData) { + final Persister persister, + final Object encodingData) { this(); this.journalID = journalID; this.operation = operation; this.id = id; this.journalRecordType = journalRecordType; + this.persister = persister; this.encodingData = encodingData; } @@ -66,8 +70,8 @@ public final class ReplicationAddMessage extends PacketImpl { buffer.writeBoolean(operation.toBoolean()); buffer.writeLong(id); buffer.writeByte(journalRecordType); - buffer.writeInt(encodingData.getEncodeSize()); - encodingData.encode(buffer); + buffer.writeInt(persister.getEncodeSize(encodingData)); + persister.encode(buffer, encodingData); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java index 59475e0..925181b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationAddTXMessage.java @@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import java.util.Arrays; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERATION_TYPE; @@ -36,7 +36,9 @@ public class ReplicationAddTXMessage extends PacketImpl { private byte recordType; - private EncodingSupport encodingData; + private Persister persister; + + private Object encodingData; private byte[] recordData; @@ -51,7 +53,8 @@ public class ReplicationAddTXMessage extends PacketImpl { final long txId, final long id, final byte recordType, - final EncodingSupport encodingData) { + final Persister persister, + final Object encodingData) { this(); this.journalID = journalID; this.operation = operation; @@ -70,8 +73,8 @@ public class ReplicationAddTXMessage extends PacketImpl { buffer.writeLong(txId); buffer.writeLong(id); buffer.writeByte(recordType); - buffer.writeInt(encodingData.getEncodeSize()); - encodingData.encode(buffer); + buffer.writeInt(persister.getEncodeSize(encodingData)); + persister.encode(buffer, encodingData); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java index 7307151..b88e0fe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationPageWriteMessage.java @@ -48,7 +48,7 @@ public class ReplicationPageWriteMessage extends PacketImpl { @Override public void decodeRest(final ActiveMQBuffer buffer) { pageNumber = buffer.readInt(); - pagedMessage = new PagedMessageImpl(); + pagedMessage = new PagedMessageImpl(null); pagedMessage.decode(buffer); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java index ea3107c..c5318e7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java @@ -17,12 +17,14 @@ package org.apache.activemq.artemis.core.remoting.server; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; +import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.utils.ReusableLatch; @@ -65,6 +67,8 @@ public interface RemotingService { boolean isStarted(); + Map<String, ProtocolManagerFactory> getProtocolFactoryMap(); + /** * Allow acceptors to use this as their default security Principal if applicable. * <p> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 50bc90d..3e15f3e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -59,6 +59,7 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; +import org.apache.activemq.artemis.spi.core.protocol.MessagePersister; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -147,7 +148,9 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif this.scheduledThreadPool = scheduledThreadPool; CoreProtocolManagerFactory coreProtocolManagerFactory = new CoreProtocolManagerFactory(); - //i know there is only 1 + + MessagePersister.getInstance().registerProtocol(coreProtocolManagerFactory); + this.flushExecutor = flushExecutor; ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.getModuleName()); @@ -174,6 +177,11 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif } @Override + public Map<String, ProtocolManagerFactory> getProtocolFactoryMap() { + return protocolMap; + } + + @Override public synchronized void start() throws Exception { if (started) { return; @@ -768,6 +776,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif */ private void loadProtocolManagerFactories(Iterable<ProtocolManagerFactory> protocolManagerFactoryCollection) { for (ProtocolManagerFactory next : protocolManagerFactoryCollection) { + MessagePersister.registerProtocol(next); String[] protocols = next.getProtocols(); for (String protocol : protocols) { ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol, next.getModuleName()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java index d70316f..0731e8c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.journal.IOCompletion; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.JournalLoadInformation; import org.apache.activemq.artemis.core.journal.LoaderCallback; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.RecordInfo; import org.apache.activemq.artemis.core.journal.TransactionFailureCallback; @@ -88,13 +89,14 @@ public class ReplicatedJournal implements Journal { @Override public void appendAddRecord(final long id, final byte recordType, - final EncodingSupport record, + Persister persister, + final Object record, final boolean sync) throws Exception { if (ReplicatedJournal.trace) { ReplicatedJournal.trace("Append record id = " + id + " recordType = " + recordType); } - replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, record); - localJournal.appendAddRecord(id, recordType, record, sync); + replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record); + localJournal.appendAddRecord(id, recordType, persister, record, sync); } /** @@ -108,14 +110,15 @@ public class ReplicatedJournal implements Journal { @Override public void appendAddRecord(final long id, final byte recordType, - final EncodingSupport record, + Persister persister, + final Object record, final boolean sync, final IOCompletion completionCallback) throws Exception { if (ReplicatedJournal.trace) { ReplicatedJournal.trace("Append record id = " + id + " recordType = " + recordType); } - replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, record); - localJournal.appendAddRecord(id, recordType, record, sync, completionCallback); + replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.ADD, id, recordType, persister, record); + localJournal.appendAddRecord(id, recordType, persister, record, sync, completionCallback); } /** @@ -146,12 +149,13 @@ public class ReplicatedJournal implements Journal { public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, - final EncodingSupport record) throws Exception { + final Persister persister, + final Object record) throws Exception { if (ReplicatedJournal.trace) { ReplicatedJournal.trace("Append record TXid = " + id + " recordType = " + recordType); } - replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.ADD, txID, id, recordType, record); - localJournal.appendAddRecordTransactional(txID, id, recordType, record); + replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.ADD, txID, id, recordType, persister, record); + localJournal.appendAddRecordTransactional(txID, id, recordType, persister, record); } /** @@ -354,26 +358,28 @@ public class ReplicatedJournal implements Journal { @Override public void appendUpdateRecord(final long id, final byte recordType, - final EncodingSupport record, + final Persister persister, + final Object record, final boolean sync) throws Exception { if (ReplicatedJournal.trace) { ReplicatedJournal.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType); } - replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, record); - localJournal.appendUpdateRecord(id, recordType, record, sync); + replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record); + localJournal.appendUpdateRecord(id, recordType, persister, record, sync); } @Override public void appendUpdateRecord(final long id, final byte journalRecordType, - final EncodingSupport record, + final Persister persister, + final Object record, final boolean sync, final IOCompletion completionCallback) throws Exception { if (ReplicatedJournal.trace) { ReplicatedJournal.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType); } - replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, record); - localJournal.appendUpdateRecord(id, journalRecordType, record, sync, completionCallback); + replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record); + localJournal.appendUpdateRecord(id, journalRecordType, persister, record, sync, completionCallback); } /** @@ -404,12 +410,13 @@ public class ReplicatedJournal implements Journal { public void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType, - final EncodingSupport record) throws Exception { + final Persister persister, + final Object record) throws Exception { if (ReplicatedJournal.trace) { ReplicatedJournal.trace("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType); } - replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, record); - localJournal.appendUpdateRecordTransactional(txID, id, recordType, record); + replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.UPDATE, txID, id, recordType, persister, record); + localJournal.appendUpdateRecordTransactional(txID, id, recordType, persister, record); } /** @@ -437,15 +444,6 @@ public class ReplicatedJournal implements Journal { } /** - * @param pages - * @see org.apache.activemq.artemis.core.journal.Journal#perfBlast(int) - */ - @Override - public void perfBlast(final int pages) { - localJournal.perfBlast(pages); - } - - /** * @throws Exception * @see org.apache.activemq.artemis.core.server.ActiveMQComponent#start() */ http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 1a07adc..e82d38e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; @@ -76,7 +77,7 @@ import org.apache.activemq.artemis.core.replication.ReplicationManager.ADD_OPERA import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.ServerMessage; + import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; @@ -651,8 +652,8 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon private void handlePageWrite(final ReplicationPageWriteMessage packet) throws Exception { PagedMessage pgdMessage = packet.getPagedMessage(); pgdMessage.initMessage(storageManager); - ServerMessage msg = pgdMessage.getMessage(); - Page page = getPage(msg.getAddress(), packet.getPageNumber()); + Message msg = pgdMessage.getMessage(); + Page page = getPage(msg.getAddressSimpleString(), packet.getPageNumber()); page.write(pgdMessage); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index d0468d1..dce5990 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.persistence.OperationContext; @@ -147,9 +148,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene final ADD_OPERATION_TYPE operation, final long id, final byte recordType, - final EncodingSupport record) throws Exception { + final Persister persister, + final Object record) throws Exception { if (enabled) { - sendReplicatePacket(new ReplicationAddMessage(journalID, operation, id, recordType, record)); + sendReplicatePacket(new ReplicationAddMessage(journalID, operation, id, recordType, persister, record)); } } @@ -164,9 +166,10 @@ public final class ReplicationManager implements ActiveMQComponent, ReadyListene final long txID, final long id, final byte recordType, - final EncodingSupport record) throws Exception { + final Persister persister, + final Object record) throws Exception { if (enabled) { - sendReplicatePacket(new ReplicationAddTXMessage(journalID, operation, txID, id, recordType, record)); + sendReplicatePacket(new ReplicationAddTXMessage(journalID, operation, txID, id, recordType, persister, record)); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 6ee844b..e72d5d9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -41,6 +41,7 @@ import java.util.concurrent.ExecutorService; import io.netty.channel.Channel; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; + import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; @@ -577,12 +578,8 @@ public interface ActiveMQServerLogger extends BasicLogger { void ioErrorAddingReferences(Integer errorCode, String errorMessage); @LogMessage(level = Logger.Level.WARN) - @Message(id = 222058, value = "Duplicate message detected through the bridge - message will not be routed. Message information:\n{0}", format = Message.Format.MESSAGE_FORMAT) - void duplicateMessageDetectedThruBridge(ServerMessage message); - - @LogMessage(level = Logger.Level.WARN) @Message(id = 222059, value = "Duplicate message detected - message will not be routed. Message information:\n{0}", format = Message.Format.MESSAGE_FORMAT) - void duplicateMessageDetected(ServerMessage message); + void duplicateMessageDetected(org.apache.activemq.artemis.api.core.Message message); @LogMessage(level = Logger.Level.WARN) @Message(id = 222060, value = "Error while confirming large message completion on rollback for recordID={0}", format = Message.Format.MESSAGE_FORMAT) @@ -783,7 +780,7 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.WARN) @Message(id = 222110, value = "no queue IDs defined!, originalMessage = {0}, copiedMessage = {1}, props={2}", format = Message.Format.MESSAGE_FORMAT) - void noQueueIdDefined(ServerMessage message, ServerMessage messageCopy, SimpleString idsHeaderName); + void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message, org.apache.activemq.artemis.api.core.Message messageCopy, SimpleString idsHeaderName); @LogMessage(level = Logger.Level.TRACE) @Message(id = 222111, value = "exception while invoking {0} on {1}", http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java index 0e38634..1ede0ea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Bindable.java @@ -16,9 +16,11 @@ */ package org.apache.activemq.artemis.core.server; +import org.apache.activemq.artemis.api.core.Message; + public interface Bindable { - void route(ServerMessage message, RoutingContext context) throws Exception; + void route(Message message, RoutingContext context) throws Exception; - void routeWithAck(ServerMessage message, RoutingContext context) throws Exception; + void routeWithAck(Message message, RoutingContext context) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java index 2a16ed2..aa58a7d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java @@ -17,10 +17,11 @@ package org.apache.activemq.artemis.core.server; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.replication.ReplicatedLargeMessage; -public interface LargeServerMessage extends ServerMessage, ReplicatedLargeMessage { +public interface LargeServerMessage extends ReplicatedLargeMessage, Message { @Override void addBytes(byte[] bytes) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java index a1e6a20..799b0b0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java @@ -16,7 +16,10 @@ */ package org.apache.activemq.artemis.core.server; + +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; import org.apache.activemq.artemis.core.transaction.Transaction; /** @@ -26,9 +29,14 @@ import org.apache.activemq.artemis.core.transaction.Transaction; */ public interface MessageReference { + final class Factory { + public static MessageReference createReference(Message encode, final Queue queue) { + return new MessageReferenceImpl(encode, queue); + } + } boolean isPaged(); - ServerMessage getMessage(); + Message getMessage(); /** * We define this method aggregation here because on paging we need to hold the original estimate, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index ae377bb..d7b70a3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; @@ -197,7 +198,7 @@ public interface Queue extends Bindable { void cancelRedistributor() throws Exception; - boolean hasMatchingConsumer(ServerMessage message); + boolean hasMatchingConsumer(Message message); Collection<Consumer> getConsumers(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java deleted file mode 100644 index 40dc50f..0000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerMessage.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.server; - -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; -import org.apache.activemq.artemis.core.paging.PagingStore; - -/** - * A ServerMessage - */ -public interface ServerMessage extends MessageInternal, EncodingSupport { - - ServerMessage setMessageID(long id); - - MessageReference createReference(Queue queue); - - /** - * This will force encoding of the address, and will re-check the buffer - * This is to avoid setMessageTransient which set the address without changing the buffer - * - * @param address - */ - void forceAddress(SimpleString address); - - int incrementRefCount() throws Exception; - - int decrementRefCount() throws Exception; - - int incrementDurableRefCount(); - - int decrementDurableRefCount(); - - ServerMessage copy(long newID); - - ServerMessage copy(); - - int getMemoryEstimate(); - - int getRefCount(); - - ServerMessage makeCopyForExpiryOrDLA(long newID, - MessageReference originalReference, - boolean expiry, - boolean copyOriginalHeaders) throws Exception; - - void setOriginalHeaders(ServerMessage other, MessageReference originalReference, boolean expiry); - - void setPagingStore(PagingStore store); - - PagingStore getPagingStore(); - - // Is there any _AMQ_ property being used - boolean hasInternalProperties(); - - boolean storeIsPaging(); - - void encodeMessageIDToBuffer(); - - byte[] getDuplicateIDBytes(); - - Object getDuplicateProperty(); -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index f4e2ec7..1899d65 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -21,10 +21,11 @@ import javax.transaction.xa.Xid; import java.util.List; import java.util.Set; +import org.apache.activemq.artemis.Closeable; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.security.SecurityAuth; @@ -99,6 +100,8 @@ public interface ServerSession extends SecurityAuth { void stop(); + void addCloseable(Closeable closeable); + /** * To be used by protocol heads that needs to control the transaction outside the session context. */ @@ -178,18 +181,19 @@ public interface ServerSession extends SecurityAuth { void receiveConsumerCredits(long consumerID, int credits) throws Exception; - void sendContinuations(int packetSize, long totalBodySize, byte[] body, boolean continues) throws Exception; - RoutingStatus send(Transaction tx, - ServerMessage message, + Message message, boolean direct, boolean noAutoCreateQueue) throws Exception; - RoutingStatus send(ServerMessage message, boolean direct, boolean noAutoCreateQueue) throws Exception; + RoutingStatus doSend(final Transaction tx, + final Message msg, + final boolean direct, + final boolean noAutoCreateQueue) throws Exception; - RoutingStatus send(ServerMessage message, boolean direct) throws Exception; + RoutingStatus send(Message message, boolean direct, boolean noAutoCreateQueue) throws Exception; - void sendLarge(MessageInternal msg) throws Exception; + RoutingStatus send(Message message, boolean direct) throws Exception; void forceConsumerDelivery(long consumerID, long sequence) throws Exception; @@ -249,7 +253,9 @@ public interface ServerSession extends SecurityAuth { SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception; - SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception; + SimpleString getMatchingQueue(SimpleString address, + SimpleString queueName, + RoutingType routingType) throws Exception; AddressInfo getAddress(SimpleString address); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java index 1583f2c..48f4aa9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Transformer.java @@ -16,9 +16,9 @@ */ package org.apache.activemq.artemis.core.server.cluster; -import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.api.core.Message; public interface Transformer { - ServerMessage transform(ServerMessage message); + Message transform(Message message); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index ee549c5..18a1f38 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -46,14 +46,14 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; + import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; + import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.Transformer; import org.apache.activemq.artemis.core.server.impl.QueueImpl; @@ -499,16 +499,16 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } /* Hook for processing message before forwarding */ - protected ServerMessage beforeForward(final ServerMessage message) { + protected Message beforeForward(final Message message) { if (useDuplicateDetection) { // We keep our own DuplicateID for the Bridge, so bouncing back and forth will work fine byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID()); - message.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes); + message.putBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID, bytes); } if (transformer != null) { - final ServerMessage transformedMessage = transformer.transform(message); + final Message transformedMessage = transformer.transform(message); if (transformedMessage != message) { if (logger.isDebugEnabled()) { logger.debug("The transformer " + transformer + @@ -556,7 +556,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled refs.put(ref.getMessage().getMessageID(), ref); } - final ServerMessage message = beforeForward(ref.getMessage()); + final Message message = beforeForward(ref.getMessage()); final SimpleString dest; @@ -564,7 +564,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled dest = forwardingAddress; } else { // Preserve the original address - dest = message.getAddress(); + dest = message.getAddressSimpleString(); } pendingAcks.countUp(); @@ -686,7 +686,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled * @param message * @return */ - private HandleStatus deliverStandardMessage(SimpleString dest, final MessageReference ref, ServerMessage message) { + private HandleStatus deliverStandardMessage(SimpleString dest, final MessageReference ref, Message message) { // if we failover during send then there is a chance that the // that this will throw a disconnect, we need to remove the message // from the acks so it will get resent, duplicate detection will cope @@ -697,6 +697,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } try { + // TODO-now: replace this producer.send(dest, message); } catch (final ActiveMQException e) { ActiveMQServerLogger.LOGGER.bridgeUnableToSendMessage(e, ref); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index f16d863..524bb08 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientConsumer; @@ -36,12 +37,11 @@ import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; + import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; @@ -113,7 +113,7 @@ public class ClusterConnectionBridge extends BridgeImpl { this.discoveryLocator = discoveryLocator; - idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name); + idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(name); this.clusterConnection = clusterConnection; @@ -150,13 +150,13 @@ public class ClusterConnectionBridge extends BridgeImpl { } @Override - protected ServerMessage beforeForward(final ServerMessage message) { + protected Message beforeForward(final Message message) { // We make a copy of the message, then we strip out the unwanted routing id headers and leave // only // the one pertinent for the address node - this is important since different queues on different // nodes could have same queue ids // Note we must copy since same message may get routed to other nodes which require different headers - ServerMessage messageCopy = message.copy(); + Message messageCopy = message.copy(); if (logger.isTraceEnabled()) { logger.trace("Clustered bridge copied message " + message + " as " + messageCopy + " before delivery"); @@ -175,12 +175,12 @@ public class ClusterConnectionBridge extends BridgeImpl { } for (SimpleString propName : propNames) { - if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS)) { + if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) { messageCopy.removeProperty(propName); } } - messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds); + messageCopy.putBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds); messageCopy = super.beforeForward(messageCopy); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index c585405..e9477a8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.utils.ReusableLatch; @@ -139,7 +138,7 @@ public class Redistributor implements Consumer { final Transaction tx = new TransactionImpl(storageManager); - final Pair<RoutingContext, ServerMessage> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx); + final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx); if (routingInfo == null) { return HandleStatus.BUSY; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java index 8f54b2a..9803433 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/RemoteQueueBindingImpl.java @@ -23,15 +23,15 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; -import org.apache.activemq.artemis.core.message.impl.MessageImpl; + import org.apache.activemq.artemis.core.postoffice.BindingType; import org.apache.activemq.artemis.core.server.Bindable; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.jboss.logging.Logger; @@ -88,7 +88,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { queueFilter = FilterImpl.createFilter(filterString); - idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(bridgeName); + idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(bridgeName); this.distance = distance; } @@ -149,7 +149,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { } @Override - public synchronized boolean isHighAcceptPriority(final ServerMessage message) { + public synchronized boolean isHighAcceptPriority(final Message message) { if (consumerCount == 0) { return false; } @@ -172,7 +172,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { } @Override - public void route(final ServerMessage message, final RoutingContext context) { + public void route(final Message message, final RoutingContext context) { addRouteContextToMessage(message); List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress()); @@ -185,7 +185,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { } @Override - public void routeWithAck(ServerMessage message, RoutingContext context) { + public void routeWithAck(Message message, RoutingContext context) { addRouteContextToMessage(message); List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress()); @@ -315,7 +315,7 @@ public class RemoteQueueBindingImpl implements RemoteQueueBinding { * * @param message */ - private void addRouteContextToMessage(final ServerMessage message) { + private void addRouteContextToMessage(final Message message) { byte[] ids = message.getBytesProperty(idsHeaderName); if (ids == null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index aa1ebf3..38500b7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2622,6 +2622,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public void addProtocolManagerFactory(ProtocolManagerFactory factory) { protocolManagerFactories.add(factory); + new Exception("protocol....").printStackTrace(); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java index 619036d..bd3f303 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.server.impl; import org.apache.activemq.artemis.api.core.Message; + import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -25,7 +26,6 @@ import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.Transformer; import org.jboss.logging.Logger; @@ -83,7 +83,7 @@ public class DivertImpl implements Divert { } @Override - public void route(final ServerMessage message, final RoutingContext context) throws Exception { + public void route(final Message message, final RoutingContext context) throws Exception { // We must make a copy of the message, otherwise things like returning credits to the page won't work // properly on ack, since the original address will be overwritten @@ -91,7 +91,7 @@ public class DivertImpl implements Divert { logger.trace("Diverting message " + message + " into " + this); } - ServerMessage copy = null; + Message copy = null; // Shouldn't copy if it's not routed anywhere else if (!forwardAddress.equals(context.getAddress())) { @@ -99,7 +99,7 @@ public class DivertImpl implements Divert { copy = message.copy(id); // This will set the original MessageId, and the original address - copy.setOriginalHeaders(message, null, false); + copy.referenceOriginalMessage(message, null); copy.setAddress(forwardAddress); @@ -130,7 +130,7 @@ public class DivertImpl implements Divert { } @Override - public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception { + public void routeWithAck(Message message, RoutingContext context) throws Exception { route(message, context); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java index 40cef50..4adb1b2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/JournalLoader.java @@ -20,6 +20,7 @@ import javax.transaction.xa.Xid; import java.util.List; import java.util.Map; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.journal.Journal; @@ -29,7 +30,6 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo; import org.apache.activemq.artemis.core.persistence.impl.PageCountPending; import org.apache.activemq.artemis.core.persistence.impl.journal.AddMessageRecord; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -43,7 +43,7 @@ public interface JournalLoader { void handleAddMessage(Map<Long, Map<Long, AddMessageRecord>> queueMap) throws Exception; - void handleNoMessageReferences(Map<Long, ServerMessage> messages); + void handleNoMessageReferences(Map<Long, Message> messages); void handleGroupingBindings(List<GroupingInfo> groupingInfos); @@ -53,7 +53,7 @@ public interface JournalLoader { ResourceManager resourceManager, Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception; - void handlePreparedSendMessage(ServerMessage message, Transaction tx, long queueID) throws Exception; + void handlePreparedSendMessage(Message message, Transaction tx, long queueID) throws Exception; void handlePreparedAcknowledge(long messageID, List<MessageReference> referencesToAck, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index eb467ae..ab6ab62 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -22,6 +22,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.api.core.Message; + import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; @@ -31,7 +32,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.Transaction; @@ -74,7 +74,7 @@ public class LastValueQueue extends QueueImpl { return; } - SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); + SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString()); if (prop != null) { HolderReference hr = map.get(prop); @@ -98,7 +98,7 @@ public class LastValueQueue extends QueueImpl { @Override public synchronized void addHead(final MessageReference ref, boolean scheduling) { - SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); + SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString()); if (prop != null) { HolderReference hr = map.get(prop); @@ -148,7 +148,7 @@ public class LastValueQueue extends QueueImpl { @Override protected void refRemoved(MessageReference ref) { synchronized (this) { - SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); + SimpleString prop = ref.getMessage().getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME.toString()); if (prop != null) { map.remove(prop); @@ -223,7 +223,7 @@ public class LastValueQueue extends QueueImpl { } @Override - public ServerMessage getMessage() { + public Message getMessage() { return ref.getMessage(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java index 6d9030e..bffb1ad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java @@ -18,11 +18,10 @@ package org.apache.activemq.artemis.core.server.impl; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.transaction.Transaction; -import org.apache.activemq.artemis.utils.MemorySize; /** * Implementation of a MessageReference @@ -35,7 +34,7 @@ public class MessageReferenceImpl implements MessageReference { private volatile long scheduledDeliveryTime; - private final ServerMessage message; + private final Message message; private final Queue queue; @@ -47,20 +46,7 @@ public class MessageReferenceImpl implements MessageReference { // Static -------------------------------------------------------- - private static final int memoryOffset; - - static { - // This is an estimate of how much memory a ServerMessageImpl takes up, exclusing body and properties - // Note, it is only an estimate, it's not possible to be entirely sure with Java - // This figure is calculated using the test utilities in org.apache.activemq.tests.unit.util.sizeof - // The value is somewhat higher on 64 bit architectures, probably due to different alignment - - if (MemorySize.is64bitArch()) { - memoryOffset = 48; - } else { - memoryOffset = 32; - } - } + private static final int memoryOffset = 64; // Constructors -------------------------------------------------- @@ -80,7 +66,7 @@ public class MessageReferenceImpl implements MessageReference { this.queue = queue; } - protected MessageReferenceImpl(final ServerMessage message, final Queue queue) { + public MessageReferenceImpl(final Message message, final Queue queue) { this.message = message; this.queue = queue; @@ -155,7 +141,7 @@ public class MessageReferenceImpl implements MessageReference { } @Override - public ServerMessage getMessage() { + public Message getMessage() { return message; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64681865/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 005a994..717e2e2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -57,7 +57,6 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; import org.apache.activemq.artemis.core.server.management.ManagementService; @@ -233,8 +232,8 @@ public class PostOfficeJournalLoader implements JournalLoader { } @Override - public void handleNoMessageReferences(Map<Long, ServerMessage> messages) { - for (ServerMessage msg : messages.values()) { + public void handleNoMessageReferences(Map<Long, Message> messages) { + for (Message msg : messages.values()) { if (msg.getRefCount() == 0) { ActiveMQServerLogger.LOGGER.journalUnreferencedMessage(msg.getMessageID()); try { @@ -284,7 +283,7 @@ public class PostOfficeJournalLoader implements JournalLoader { } @Override - public void handlePreparedSendMessage(ServerMessage message, Transaction tx, long queueID) throws Exception { + public void handlePreparedSendMessage(Message message, Transaction tx, long queueID) throws Exception { Queue queue = queues.get(queueID); if (queue == null) {