This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit ac73917a37698f12335b18289a6306574f80e5a8 Author: dakirily <daniel.kiril...@gmail.com> AuthorDate: Thu Jul 8 15:51:16 2021 +0200 QPID-8549: [Broker-J] Improve null checks This closes #100 --- .../berkeleydb/BDBConfiguredObjectRecord.java | 5 +- .../server/consumer/AbstractConsumerTarget.java | 6 +- .../queue/DefinedGroupMessageGroupManager.java | 22 ++-- .../qpid/server/queue/SortedQueueEntryList.java | 15 ++- .../transport/MultiVersionProtocolEngine.java | 46 ++++--- .../protocol/v1_0/AMQPConnection_1_0Impl.java | 10 +- .../server/protocol/v1_0/ConsumerTarget_1_0.java | 142 +++++++++++---------- .../protocol/v1_0/MessageConverter_from_1_0.java | 22 ++-- .../org/apache/qpid/tools/StressTestClient.java | 6 +- .../apache/qpid/tools/util/ArgumentsParser.java | 6 +- 10 files changed, 158 insertions(+), 122 deletions(-) diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java index 3a6a9f0..715823d 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfiguredObjectRecord.java @@ -111,7 +111,10 @@ public class BDBConfiguredObjectRecord implements ConfiguredObjectRecord @Override public String toString() { - return "BDBConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", name=" + (_attributes == null ? null : _attributes.get("name")) + ", parents=" + _parents + "]"; + return String.format( + "BDBConfiguredObjectRecord [id=%s, type=%s, name=%s, parents=%s]", + _id, _type, (_attributes == null ? "null" : _attributes.get("name")), _parents + ); } } diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index 13d5e15..6733ed4 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -256,9 +256,9 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T> } } - if (messageContainer != null) + if (consumer != null && messageContainer != null) { - MessageInstance entry = messageContainer.getMessageInstance(); + final MessageInstance entry = messageContainer.getMessageInstance(); try { send(consumer, entry, false); @@ -281,7 +281,7 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T> case ROUTE_TO_ALTERNATE: if (consumer.acquires()) { - int enqueues = entry.routeToAlternate(null, null, null); + final int enqueues = entry.routeToAlternate(null, null, null); if (enqueues == 0) { LOGGER.info("Failed to convert message {} for this consumer because '{}'." diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java index 06803a7..04bb11a 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java @@ -251,14 +251,20 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager private Object getKey(QueueEntry entry) { - ServerMessage message = entry.getMessage(); - AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader(); - Object groupVal = messageHeader == null - ? _defaultGroup - : _groupId == null - ? messageHeader.getGroupId() - : messageHeader.getHeader(_groupId); - if(groupVal == null) + final ServerMessage message = entry.getMessage(); + final AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader(); + Object groupVal; + if (messageHeader == null) + { + groupVal = _defaultGroup; + } + else + { + groupVal = _groupId == null + ? messageHeader.getGroupId() + : messageHeader.getHeader(_groupId); + } + if (groupVal == null) { groupVal = _defaultGroup; } diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index 3babb1b..a41a265 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.SortedQueueEntry.Colour; import org.apache.qpid.server.store.MessageEnqueueRecord; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; /** * A sorted implementation of QueueEntryList. @@ -84,7 +85,7 @@ public class SortedQueueEntryList extends AbstractQueueEntryList private void insert(final SortedQueueEntry entry) { SortedQueueEntry node; - if((node = _root) == null) + if ((node = _root) == null) { _root = entry; _head.setNext(entry); @@ -94,10 +95,10 @@ public class SortedQueueEntryList extends AbstractQueueEntryList else { SortedQueueEntry parent = null; - while(node != null) + while (node != null) { parent = node; - if(entry.compareTo(node) < 0) + if (entry.compareTo(node) < 0) { node = node.getLeft(); } @@ -106,9 +107,13 @@ public class SortedQueueEntryList extends AbstractQueueEntryList node = node.getRight(); } } + if (parent == null) + { + throw new ConnectionScopedRuntimeException("Failed to insert an entry, parent not found"); + } entry.setParent(parent); - if(entry.compareTo(parent) < 0) + if (entry.compareTo(parent) < 0) { parent.setLeft(entry); final SortedQueueEntry prev = parent.getPrev(); @@ -125,7 +130,7 @@ public class SortedQueueEntryList extends AbstractQueueEntryList entry.setNext(next); parent.setNext(entry); - if(next != null) + if (next != null) { next.setPrev(entry); } diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java b/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java index 56a0fa6..85e79fb 100755 --- a/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java +++ b/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java @@ -24,14 +24,18 @@ package org.apache.qpid.server.transport; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.security.cert.Certificate; +import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -402,10 +406,10 @@ public class MultiVersionProtocolEngine implements ProtocolEngine _header.put(msgheader); } - if(!_header.hasRemaining()) + if (!_header.hasRemaining()) { _header.flip(); - byte[] headerBytes = new byte[MINIMUM_REQUIRED_HEADER_BYTES]; + final byte[] headerBytes = new byte[MINIMUM_REQUIRED_HEADER_BYTES]; _header.get(headerBytes); @@ -416,25 +420,25 @@ public class MultiVersionProtocolEngine implements ProtocolEngine //Check the supported versions for a header match, and if there is one save the //delegate. Also save most recent supported version and associated reply header bytes - for(int i = 0; newDelegate == null && i < _creators.length; i++) + for (int i = 0; newDelegate == null && i < _creators.length; i++) { final ProtocolEngineCreator creator = _creators[i]; - if(_supported.contains(creator.getVersion())) + if (_supported.contains(creator.getVersion())) { supportedReplyBytes = creator.getHeaderIdentifier(); supportedReplyVersion = creator.getVersion(); - byte[] compareBytes = creator.getHeaderIdentifier(); + final byte[] compareBytes = creator.getHeaderIdentifier(); boolean equal = true; - for(int j = 0; equal && j<compareBytes.length; j++) + for (int j = 0; equal && j<compareBytes.length; j++) { equal = headerBytes[j] == compareBytes[j]; } - if(equal) + if (equal) { newDelegate = creator.newProtocolEngine(_broker, _network, _port, _transport, _id, _aggregateTicker); - if(newDelegate == null && creator.getSuggestedAlternativeHeader() != null) + if (newDelegate == null && creator.getSuggestedAlternativeHeader() != null) { defaultSupportedReplyBytes = creator.getSuggestedAlternativeHeader(); } @@ -443,17 +447,17 @@ public class MultiVersionProtocolEngine implements ProtocolEngine //If there is a configured default reply to an unsupported version initiation, //then save the associated reply header bytes when we encounter them - if(defaultSupportedReplyBytes == null && _defaultSupportedReply != null && creator.getVersion() == _defaultSupportedReply) + if (defaultSupportedReplyBytes == null && _defaultSupportedReply != null && creator.getVersion() == _defaultSupportedReply) { defaultSupportedReplyBytes = creator.getHeaderIdentifier(); } } // If no delegate is found then send back a supported protocol version id - if(newDelegate == null) + if (newDelegate == null) { //if a default reply was specified use its reply header instead of the most recent supported version - if(_defaultSupportedReply != null && !(_defaultSupportedReply == supportedReplyVersion)) + if (_defaultSupportedReply != null && !(_defaultSupportedReply == supportedReplyVersion)) { LOGGER.debug("Default reply to unsupported protocol version was configured, changing reply from {} to {}", supportedReplyVersion, _defaultSupportedReply); @@ -464,7 +468,16 @@ public class MultiVersionProtocolEngine implements ProtocolEngine _broker.getEventLogger().message(new PortLogSubject(_port), PortMessages.UNSUPPORTED_PROTOCOL_HEADER(Functions.str(headerBytes), - supportedReplyVersion.toString())); + String.valueOf(supportedReplyVersion))); + + if (supportedReplyBytes == null) + { + ProtocolEngineCreator protocol = Arrays.stream(_creators) + .filter(creator -> creator.getVersion().isAMQP() && _supported.contains(creator.getVersion())) + .max((creator1, creator2) -> creator1.getVersion().ordinal() - creator2.getVersion().ordinal()) + .orElseThrow(() -> new ServerScopedRuntimeException("All AMQP protocols are disabled")); + supportedReplyBytes = protocol.getHeaderIdentifier(); + } try (QpidByteBuffer supportedReplyBuf = QpidByteBuffer.allocateDirect(supportedReplyBytes.length)) { @@ -472,6 +485,7 @@ public class MultiVersionProtocolEngine implements ProtocolEngine supportedReplyBuf.flip(); _sender.send(supportedReplyBuf); } + _sender.flush(); _delegate = new ClosedDelegateProtocolEngine(); @@ -481,7 +495,7 @@ public class MultiVersionProtocolEngine implements ProtocolEngine } else { - boolean hasWork = _delegate.hasWork(); + final boolean hasWork = _delegate.hasWork(); if (hasWork) { newDelegate.notifyWork(); @@ -493,14 +507,14 @@ public class MultiVersionProtocolEngine implements ProtocolEngine _delegate.received(_header); _header.dispose(); - Certificate peerCertificate = _network.getPeerCertificate(); - if(peerCertificate != null && _port.getClientCertRecorder() != null) + final Certificate peerCertificate = _network.getPeerCertificate(); + if (peerCertificate != null && _port.getClientCertRecorder() != null) { ((ManagedPeerCertificateTrustStore)(_port.getClientCertRecorder())).addCertificate(peerCertificate); } - if(msg.hasRemaining()) + if (msg.hasRemaining()) { _delegate.received(msg); } diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java index bce5b85..a7b6c81 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java @@ -926,12 +926,12 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio try { - boolean registerSucceeded = addressSpace.registerConnection(this, (existingConnections, newConnection) -> + final boolean registerSucceeded = addressSpace.registerConnection(this, (existingConnections, newConnection) -> { boolean proceedWithRegistration = true; if (newConnection instanceof AMQPConnection_1_0Impl && !newConnection.isClosing()) { - List<ListenableFuture<Void>> rescheduleFutures = new ArrayList<>(); + final List<ListenableFuture<Void>> rescheduleFutures = new ArrayList<>(); for (AMQPConnection<?> existingConnection : StreamSupport.stream(existingConnections.spliterator(), false) .filter(con -> con instanceof AMQPConnection_1_0) .filter(con -> !con.isClosing()) @@ -953,10 +953,10 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio if (SoleConnectionEnforcementPolicy.REFUSE_CONNECTION.equals(soleConnectionEnforcementPolicy)) { _properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true); - Error error = new Error(AmqpError.INVALID_FIELD, + final Error error = new Error(AmqpError.INVALID_FIELD, String.format( "Connection closed due to sole-connection-enforcement-policy '%s'", - soleConnectionEnforcementPolicy.toString())); + String.valueOf(soleConnectionEnforcementPolicy))); error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), Symbol.valueOf("container-id"))); newConnection.doOnIOThreadAsync(() -> ((AMQPConnection_1_0Impl) newConnection).closeConnection(error)); proceedWithRegistration = false; @@ -967,7 +967,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio final Error error = new Error(AmqpError.RESOURCE_LOCKED, String.format( "Connection closed due to sole-connection-enforcement-policy '%s'", - soleConnectionEnforcementPolicy.toString())); + String.valueOf(soleConnectionEnforcementPolicy))); error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true)); rescheduleFutures.add(existingConnection.doOnIOThreadAsync( () -> ((AMQPConnection_1_0Impl) existingConnection).closeConnection(error))); diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index 5f9283e..9e1d168 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -458,7 +458,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0> { txn = _linkEndpoint.getTransaction(transactionId); getSession().getConnection().registerTransactedMessageDelivered(); - TransactionLogResource owningResource = _queueEntry.getOwningResource(); + final TransactionLogResource owningResource = _queueEntry.getOwningResource(); if (owningResource instanceof TransactionMonitor) { ((TransactionMonitor) owningResource).registerTransaction(txn); @@ -482,11 +482,13 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0> txn = null; } - if(outcome instanceof Accepted) + if (txn != null) { - if (_queueEntry.makeAcquisitionUnstealable(getConsumer())) + if (outcome instanceof Accepted) { - txn.dequeue(_queueEntry.getEnqueueRecord(), + if (_queueEntry.makeAcquisitionUnstealable(getConsumer())) + { + txn.dequeue(_queueEntry.getEnqueueRecord(), new ServerTransaction.Action() { @Override @@ -504,13 +506,13 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0> } }); - } - txn.addPostTransactionAction(new ServerTransaction.Action() + } + txn.addPostTransactionAction(new ServerTransaction.Action() { @Override public void postCommit() { - if(Boolean.TRUE.equals(settled)) + if (Boolean.TRUE.equals(settled)) { _linkEndpoint.settle(_deliveryTag); } @@ -523,94 +525,94 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0> @Override public void onRollback() { - if(Boolean.TRUE.equals(settled)) + if (Boolean.TRUE.equals(settled)) { // TODO: apply source's default outcome applyModifiedOutcome(); } } }); - } - else if(outcome instanceof Released) - { - txn.addPostTransactionAction(new ServerTransaction.Action() + } + else if (outcome instanceof Released) { - @Override - public void postCommit() + txn.addPostTransactionAction(new ServerTransaction.Action() { + @Override + public void postCommit() + { - _queueEntry.release(getConsumer()); - _linkEndpoint.settle(_deliveryTag); - } + _queueEntry.release(getConsumer()); + _linkEndpoint.settle(_deliveryTag); + } - @Override - public void onRollback() - { - _linkEndpoint.settle(_deliveryTag); + @Override + public void onRollback() + { + _linkEndpoint.settle(_deliveryTag); - // TODO: apply source's default outcome if settled - } - }); - } - else if(outcome instanceof Modified) - { - txn.addPostTransactionAction(new ServerTransaction.Action() + // TODO: apply source's default outcome if settled + } + }); + } + else if (outcome instanceof Modified) { - @Override - public void postCommit() + txn.addPostTransactionAction(new ServerTransaction.Action() { - Modified modifiedOutcome = (Modified) outcome; - if (Boolean.TRUE.equals(modifiedOutcome.getUndeliverableHere())) + @Override + public void postCommit() { - _queueEntry.reject(getConsumer()); - } + final Modified modifiedOutcome = (Modified) outcome; + if (Boolean.TRUE.equals(modifiedOutcome.getUndeliverableHere())) + { + _queueEntry.reject(getConsumer()); + } - if(Boolean.TRUE.equals(modifiedOutcome.getDeliveryFailed())) - { - incrementDeliveryCountOrRouteToAlternateOrDiscard(); - } - else - { - _queueEntry.release(getConsumer()); + if (Boolean.TRUE.equals(modifiedOutcome.getDeliveryFailed())) + { + incrementDeliveryCountOrRouteToAlternateOrDiscard(); + } + else + { + _queueEntry.release(getConsumer()); + } + _linkEndpoint.settle(_deliveryTag); } - _linkEndpoint.settle(_deliveryTag); - } - @Override - public void onRollback() - { - if(Boolean.TRUE.equals(settled)) + @Override + public void onRollback() { - // TODO: apply source's default outcome - applyModifiedOutcome(); + if (Boolean.TRUE.equals(settled)) + { + // TODO: apply source's default outcome + applyModifiedOutcome(); + } } - } - }); - } - else if (outcome instanceof Rejected) - { - txn.addPostTransactionAction(new ServerTransaction.Action() + }); + } + else if (outcome instanceof Rejected) { - @Override - public void postCommit() + txn.addPostTransactionAction(new ServerTransaction.Action() { - _linkEndpoint.settle(_deliveryTag); - incrementDeliveryCountOrRouteToAlternateOrDiscard(); - _linkEndpoint.sendFlowConditional(); - } + @Override + public void postCommit() + { + _linkEndpoint.settle(_deliveryTag); + incrementDeliveryCountOrRouteToAlternateOrDiscard(); + _linkEndpoint.sendFlowConditional(); + } - @Override - public void onRollback() - { - if(Boolean.TRUE.equals(settled)) + @Override + public void onRollback() { - // TODO: apply source's default outcome - applyModifiedOutcome(); + if (Boolean.TRUE.equals(settled)) + { + // TODO: apply source's default outcome + applyModifiedOutcome(); + } } - } - }); + }); + } } - return (transactionId == null && outcome != null); } diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java index b744d25..93ab0c3 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java @@ -26,6 +26,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.LinkedHashMap; @@ -78,7 +79,7 @@ public class MessageConverter_from_1_0 static Object convertBodyToObject(final Message_1_0 serverMessage) { - SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY.getSectionDecoderRegistry()); + final SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY.getSectionDecoderRegistry()); Object bodyObject = null; List<EncodingRetainingSection<?>> sections = null; @@ -89,12 +90,13 @@ public class MessageConverter_from_1_0 sections = sectionDecoder.parseAll(allData); } - List<EncodingRetainingSection<?>> bodySections = new ArrayList<>(sections.size()); - ListIterator<EncodingRetainingSection<?>> iterator = sections.listIterator(); + final int size = sections == null ? 0 : sections.size(); + final List<EncodingRetainingSection<?>> bodySections = new ArrayList<>(size); + final ListIterator<EncodingRetainingSection<?>> iterator = sections == null ? Collections.emptyListIterator() : sections.listIterator(); EncodingRetainingSection<?> previousSection = null; while(iterator.hasNext()) { - EncodingRetainingSection<?> section = iterator.next(); + final EncodingRetainingSection<?> section = iterator.next(); if (section instanceof AmqpValueSection || section instanceof DataSection || section instanceof AmqpSequenceSection) { if (previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValueSection)) @@ -112,20 +114,20 @@ public class MessageConverter_from_1_0 // In 1.0 of the spec, it is illegal to have message with no body but AMQP-127 asks to have that restriction lifted if (!bodySections.isEmpty()) { - EncodingRetainingSection<?> firstBodySection = bodySections.get(0); - if(firstBodySection instanceof AmqpValueSection) + final EncodingRetainingSection<?> firstBodySection = bodySections.get(0); + if (firstBodySection instanceof AmqpValueSection) { bodyObject = convertValue(firstBodySection.getValue()); } else if(firstBodySection instanceof DataSection) { int totalSize = 0; - for(EncodingRetainingSection<?> section : bodySections) + for (EncodingRetainingSection<?> section : bodySections) { totalSize += ((DataSection)section).getValue().getArray().length; } - byte[] bodyData = new byte[totalSize]; - ByteBuffer buf = ByteBuffer.wrap(bodyData); + final byte[] bodyData = new byte[totalSize]; + final ByteBuffer buf = ByteBuffer.wrap(bodyData); for(EncodingRetainingSection<?> section : bodySections) { buf.put(((DataSection) section).getValue().asByteBuffer()); @@ -134,7 +136,7 @@ public class MessageConverter_from_1_0 } else { - ArrayList<Object> totalSequence = new ArrayList<>(); + final ArrayList<Object> totalSequence = new ArrayList<>(); for(EncodingRetainingSection<?> section : bodySections) { totalSequence.addAll(((AmqpSequenceSection)section).getValue()); diff --git a/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java b/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java index 327a754..b4e847b 100644 --- a/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java +++ b/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java @@ -406,7 +406,11 @@ public class StressTestClient { System.out.println(CLASS + ": Consuming Message " + cs); } - Message msg = consumer.receive(recieveTimeout); + Message msg = null; + if (consumer != null) + { + msg = consumer.receive(recieveTimeout); + } if (sess.getTransacted() && cs % txBatch == 0) { diff --git a/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java b/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java index a71b466..e9c4110 100644 --- a/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java +++ b/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java @@ -133,7 +133,7 @@ public class ArgumentsParser public void usage(Class<?> objectClass, Set<String> requiredFields) { System.out.println("Supported arguments:"); - Field[] fields = objectClass.getDeclaredFields(); + final Field[] fields = objectClass.getDeclaredFields(); Object object = null; try @@ -147,7 +147,7 @@ public class ArgumentsParser for (int i = 0 ; i< fields.length ; i++) { - Field field = fields[i]; + final Field field = fields[i]; if (!Modifier.isFinal(field.getModifiers())) { Object defaultValue = null; @@ -163,7 +163,7 @@ public class ArgumentsParser System.out.println(" " + field.getName() + " ( type: " + field.getType().getSimpleName().toLowerCase() - + (object != null ? ", default: " + defaultValue : "") + + (object != null ? ", default: " + String.valueOf(defaultValue) : "") + (requiredFields != null && requiredFields.contains(field.getName()) ? ", mandatory" : "") + ")"); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org