Repository: nifi Updated Branches: refs/heads/master 18a4819d5 -> e3b0949b6
NIFI-4508: This closes #2784. Update ConsumeAMQP to use basicConsume API instead of basicGet in order to provide better performance NIFI-4508: Added some additional unit tests to verify behavior Signed-off-by: joewitt <joew...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e3b0949b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e3b0949b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e3b0949b Branch: refs/heads/master Commit: e3b0949b6b92d154f96b709ad219408c54b28ee4 Parents: 18a4819 Author: Mark Payne <marka...@hotmail.com> Authored: Fri Jun 8 15:37:17 2018 -0400 Committer: joewitt <joew...@apache.org> Committed: Mon Jun 11 10:27:28 2018 -0400 ---------------------------------------------------------------------- .../nifi/amqp/processors/AMQPConsumer.java | 71 +++++++++- .../apache/nifi/amqp/processors/AMQPWorker.java | 20 +-- .../amqp/processors/AbstractAMQPProcessor.java | 1 + .../nifi/amqp/processors/ConsumeAMQP.java | 99 ++++++++++---- .../nifi/amqp/processors/AMQPConsumerTest.java | 39 ++++-- .../nifi/amqp/processors/ConsumeAMQPTest.java | 133 ++++++++++++++++++- .../nifi/amqp/processors/TestChannel.java | 51 ++++++- 7 files changed, 350 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java index 0466469..8872e0c 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java @@ -17,13 +17,20 @@ package org.apache.nifi.amqp.processors; import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeoutException; import org.apache.nifi.processor.exception.ProcessException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; import com.rabbitmq.client.GetResponse; /** @@ -34,12 +41,45 @@ final class AMQPConsumer extends AMQPWorker { private final static Logger logger = LoggerFactory.getLogger(AMQPConsumer.class); private final String queueName; + private final BlockingQueue<GetResponse> responseQueue; + private final boolean autoAcknowledge; + private final Consumer consumer; - AMQPConsumer(Connection connection, String queueName) { + private volatile boolean closed = false; + + + AMQPConsumer(final Connection connection, final String queueName, final boolean autoAcknowledge) throws IOException { super(connection); this.validateStringProperty("queueName", queueName); this.queueName = queueName; + this.autoAcknowledge = autoAcknowledge; + this.responseQueue = new LinkedBlockingQueue<>(10); + logger.info("Successfully connected AMQPConsumer to " + connection.toString() + " and '" + queueName + "' queue"); + + final Channel channel = getChannel(); + consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) throws IOException { + if (!autoAcknowledge && closed) { + channel.basicReject(envelope.getDeliveryTag(), true); + return; + } + + try { + responseQueue.put(new GetResponse(envelope, properties, body, Integer.MAX_VALUE)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }; + + channel.basicConsume(queueName, autoAcknowledge, consumer); + } + + // Visible for unit tests + protected Consumer getConsumer() { + return consumer; } /** @@ -53,12 +93,29 @@ final class AMQPConsumer extends AMQPWorker { * @return instance of {@link GetResponse} */ public GetResponse consume() { - try { - return getChannel().basicGet(this.queueName, true); - } catch (IOException e) { - logger.error("Failed to receive message from AMQP; " + this + ". Possible reasons: Queue '" + this.queueName - + "' may not have been defined", e); - throw new ProcessException(e); + return responseQueue.poll(); + } + + public void acknowledge(final GetResponse response) throws IOException { + if (autoAcknowledge) { + return; + } + + getChannel().basicAck(response.getEnvelope().getDeliveryTag(), true); + } + + @Override + public void close() throws TimeoutException, IOException { + closed = true; + + GetResponse lastMessage = null; + GetResponse response; + while ((response = responseQueue.poll()) != null) { + lastMessage = response; + } + + if (lastMessage != null) { + getChannel().basicNack(lastMessage.getEnvelope().getDeliveryTag(), true, true); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java index 990ed0b..d17ea0d 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java @@ -35,6 +35,7 @@ abstract class AMQPWorker implements AutoCloseable { private final static Logger logger = LoggerFactory.getLogger(AMQPWorker.class); private final Channel channel; + private boolean closed = false; /** * Creates an instance of this worker initializing it with AMQP @@ -58,20 +59,21 @@ abstract class AMQPWorker implements AutoCloseable { return channel; } - /** - * Closes {@link Channel} created when instance of this class was created. - */ + @Override public void close() throws TimeoutException, IOException { + if (closed) { + return; + } + if (logger.isDebugEnabled()) { logger.debug("Closing AMQP channel for " + this.channel.getConnection().toString()); } + this.channel.close(); + closed = true; } - /** - * - */ @Override public String toString() { return this.getClass().getSimpleName() + ":" + this.channel.getConnection().toString(); @@ -80,10 +82,8 @@ abstract class AMQPWorker implements AutoCloseable { /** * Validates that a String property has value (not null nor empty) * - * @param propertyName - * the name of the property - * @param value - * the value of the property + * @param propertyName the name of the property + * @param value the value of the property */ void validateStringProperty(String propertyName, String value) { if (value == null || value.trim().length() == 0) { http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java index 3e55283..18e176c 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java @@ -142,6 +142,7 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce private final BlockingQueue<AMQPResource<T>> resourceQueue = new LinkedBlockingQueue<>(); + /** * Will builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}) upon first invocation and will delegate to the * implementation of {@link #processResource(ProcessContext, ProcessSession)} method for further processing. http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java index 1b0ee52..55a53c7 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.amqp.processors; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -31,17 +32,19 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Connection; import com.rabbitmq.client.GetResponse; -@Tags({ "amqp", "rabbit", "get", "message", "receive", "consume" }) +@Tags({"amqp", "rabbit", "get", "message", "receive", "consume"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Consumes AMQP Messages from an AMQP Broker using the AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be " + "emitted as its own FlowFile to the 'success' relationship.") @@ -65,16 +68,36 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> { private static final String ATTRIBUTES_PREFIX = "amqp$"; public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() - .name("Queue") - .description("The name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator. ") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Queue") + .description("The name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator. ") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder() + .name("auto.acknowledge") + .displayName("Auto-Acknowledge messages") + .description("If true, messages that are received will be auto-acknowledged by the AMQP Broker. " + + "This generally will provide better throughput but could result in messages being lost upon restart of NiFi") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("batch.size") + .displayName("Batch Size") + .description("The maximum number of messages that should be pulled in a single session. Once this many messages have been received (or once no more messages are readily available), " + + "the messages received will be transferred to the 'success' relationship and the messages will be acknowledged with the AMQP Broker. Setting this value to a larger number " + + "could result in better performance, particularly for very small messages, but can also result in more messages being duplicated upon sudden restart of NiFi.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .defaultValue("10") + .required(true) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All FlowFiles that are received from the AMQP queue are routed to this relationship") - .build(); + .name("success") + .description("All FlowFiles that are received from the AMQP queue are routed to this relationship") + .build(); private static final List<PropertyDescriptor> propertyDescriptors; private static final Set<Relationship> relationships; @@ -82,6 +105,8 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> { static { List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(QUEUE); + properties.add(AUTO_ACKNOWLEDGE); + properties.add(BATCH_SIZE); properties.addAll(getCommonPropertyDescriptors()); propertyDescriptors = Collections.unmodifiableList(properties); @@ -97,21 +122,40 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> { */ @Override protected void processResource(final Connection connection, final AMQPConsumer consumer, final ProcessContext context, final ProcessSession session) { - final GetResponse response = consumer.consume(); - if (response == null) { - context.yield(); - return; - } + GetResponse lastReceived = null; + + for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) { + final GetResponse response = consumer.consume(); + if (response == null) { + if (lastReceived == null) { + // If no messages received, then yield. + context.yield(); + } - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, out -> out.write(response.getBody())); + break; + } - final BasicProperties amqpProperties = response.getProps(); - final Map<String, String> attributes = buildAttributes(amqpProperties); - flowFile = session.putAllAttributes(flowFile, attributes); + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, out -> out.write(response.getBody())); - session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue()); - session.transfer(flowFile, REL_SUCCESS); + final BasicProperties amqpProperties = response.getProps(); + final Map<String, String> attributes = buildAttributes(amqpProperties); + flowFile = session.putAllAttributes(flowFile, attributes); + + session.getProvenanceReporter().receive(flowFile, connection.toString() + "/" + context.getProperty(QUEUE).getValue()); + session.transfer(flowFile, REL_SUCCESS); + lastReceived = response; + } + + session.commit(); + + if (lastReceived != null) { + try { + consumer.acknowledge(lastReceived); + } catch (IOException e) { + throw new ProcessException("Failed to acknowledge message", e); + } + } } private Map<String, String> buildAttributes(final BasicProperties properties) { @@ -142,9 +186,16 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> { } @Override - protected AMQPConsumer createAMQPWorker(final ProcessContext context, final Connection connection) { - final String queueName = context.getProperty(QUEUE).getValue(); - return new AMQPConsumer(connection, queueName); + protected synchronized AMQPConsumer createAMQPWorker(final ProcessContext context, final Connection connection) { + try { + final String queueName = context.getProperty(QUEUE).getValue(); + final boolean autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean(); + final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge); + + return amqpConsumer; + } catch (final IOException ioe) { + throw new ProcessException("Failed to connect to AMQP Broker", ioe); + } } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java index 75fe716..87278fd 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java @@ -18,44 +18,59 @@ package org.apache.nifi.amqp.processors; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; -import org.apache.nifi.processor.exception.ProcessException; import org.junit.Test; +import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Connection; import com.rabbitmq.client.GetResponse; public class AMQPConsumerTest { - @SuppressWarnings("resource") + + @Test + public void testUnconsumedMessagesNacked() throws TimeoutException, IOException { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + + final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true); + consumer.getChannel().basicPublish("myExchange", "key1", new BasicProperties(), new byte[0]); + + consumer.close(); + assertTrue(((TestChannel) consumer.getChannel()).isNack(0)); + } + @Test(expected = IllegalArgumentException.class) - public void failOnNullConnection() { - new AMQPConsumer(null, null); + public void failOnNullConnection() throws IOException { + new AMQPConsumer(null, null, true); } - @SuppressWarnings("resource") @Test(expected = IllegalArgumentException.class) public void failOnNullQueueName() throws Exception { Connection conn = new TestConnection(null, null); - new AMQPConsumer(conn, null); + new AMQPConsumer(conn, null, true); } - @SuppressWarnings("resource") @Test(expected = IllegalArgumentException.class) public void failOnEmptyQueueName() throws Exception { Connection conn = new TestConnection(null, null); - new AMQPConsumer(conn, " "); + new AMQPConsumer(conn, " ", true); } - @Test(expected = ProcessException.class) + @Test(expected = IOException.class) public void failOnNonExistingQueue() throws Exception { Connection conn = new TestConnection(null, null); - try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello")) { + try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true)) { consumer.consume(); } } @@ -68,7 +83,7 @@ public class AMQPConsumerTest { exchangeToRoutingKeymap.put("", "queue1"); Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); - try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1")) { + try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true)) { GetResponse response = consumer.consume(); assertNull(response); } @@ -83,7 +98,7 @@ public class AMQPConsumerTest { Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); conn.createChannel().basicPublish("myExchange", "key1", null, "hello Joe".getBytes()); - try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1")) { + try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true)) { GetResponse response = consumer.consume(); assertNotNull(response); } http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java index 66abb2d..6374fa6 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java @@ -16,26 +16,138 @@ */ package org.apache.nifi.amqp.processors; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; +import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Envelope; import com.rabbitmq.client.MessageProperties; public class ConsumeAMQPTest { + @Test + public void testMessageAcked() throws TimeoutException, IOException { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); + sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); + + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ConsumeAMQP.HOST, "injvm"); + runner.setProperty(ConsumeAMQP.QUEUE, "queue1"); + runner.setProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE, "false"); + + runner.run(); + + runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 2); + + final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + helloFF.assertContentEquals("hello"); + + final MockFlowFile worldFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(1); + worldFF.assertContentEquals("world"); + + // A single cumulative ack should be used + assertFalse(((TestChannel) connection.createChannel()).isAck(0)); + assertTrue(((TestChannel) connection.createChannel()).isAck(1)); + } + } + + @Test + public void testBatchSizeAffectsAcks() throws TimeoutException, IOException { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); + sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); + + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ConsumeAMQP.HOST, "injvm"); + runner.setProperty(ConsumeAMQP.QUEUE, "queue1"); + runner.setProperty(ConsumeAMQP.BATCH_SIZE, "1"); + + runner.run(2); + + runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 2); + + final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + helloFF.assertContentEquals("hello"); + + final MockFlowFile worldFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(1); + worldFF.assertContentEquals("world"); + + // A single cumulative ack should be used + assertTrue(((TestChannel) connection.createChannel()).isAck(0)); + assertTrue(((TestChannel) connection.createChannel()).isAck(1)); + } + } + + @Test + public void testMessagesRejectedOnStop() throws TimeoutException, IOException { + final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1")); + final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); + + final Connection connection = new TestConnection(exchangeToRoutingKeymap, routingMap); + + try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); + sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); + sender.publish("good-bye".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); + + LocalConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ConsumeAMQP.HOST, "injvm"); + runner.setProperty(ConsumeAMQP.QUEUE, "queue1"); + runner.setProperty(ConsumeAMQP.BATCH_SIZE, "1"); + + runner.run(); + proc.close(); + + runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 1); + + final MockFlowFile helloFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); + helloFF.assertContentEquals("hello"); + + + // A single cumulative ack should be used + assertTrue(((TestChannel) connection.createChannel()).isAck(0)); + + // Messages 1 and 2 will have been delivered but on stop should be rejected. They will be rejected + // cumulatively, though, so only delivery Tag 2 will be nack'ed explicitly + assertTrue(((TestChannel) connection.createChannel()).isNack(2)); + + // Any newly delivered messages should also be immediately nack'ed. + proc.getAMQPWorker().getConsumer().handleDelivery("123", new Envelope(3, false, "myExchange", "key1"), new BasicProperties(), new byte[0]); + assertTrue(((TestChannel) connection.createChannel()).isNack(3)); + } + } @Test public void validateSuccessfullConsumeAndTransferToSuccess() throws Exception { @@ -47,8 +159,8 @@ public class ConsumeAMQPTest { try (AMQPPublisher sender = new AMQPPublisher(connection, mock(ComponentLog.class))) { sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); - ConsumeAMQP pubProc = new LocalConsumeAMQP(connection); - TestRunner runner = TestRunners.newTestRunner(pubProc); + ConsumeAMQP proc = new LocalConsumeAMQP(connection); + TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(ConsumeAMQP.HOST, "injvm"); runner.setProperty(ConsumeAMQP.QUEUE, "queue1"); @@ -56,11 +168,11 @@ public class ConsumeAMQPTest { final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); assertNotNull(successFF); } - } public static class LocalConsumeAMQP extends ConsumeAMQP { private final Connection connection; + private AMQPConsumer consumer; public LocalConsumeAMQP(Connection connection) { this.connection = connection; @@ -68,7 +180,20 @@ public class ConsumeAMQPTest { @Override protected AMQPConsumer createAMQPWorker(ProcessContext context, Connection connection) { - return new AMQPConsumer(connection, context.getProperty(QUEUE).getValue()); + try { + if (consumer != null) { + throw new IllegalStateException("Consumer already created"); + } + + consumer = new AMQPConsumer(connection, context.getProperty(QUEUE).getValue(), context.getProperty(AUTO_ACKNOWLEDGE).asBoolean()); + return consumer; + } catch (IOException e) { + throw new ProcessException(e); + } + } + + public AMQPConsumer getAMQPWorker() { + return consumer; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java index 4ed4eeb..1011f62 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java @@ -18,9 +18,11 @@ package org.apache.nifi.amqp.processors; import java.io.IOException; import java.util.ArrayList; +import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -48,6 +50,7 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.ConsumerShutdownSignalCallback; import com.rabbitmq.client.DeliverCallback; +import com.rabbitmq.client.Envelope; import com.rabbitmq.client.GetResponse; import com.rabbitmq.client.Method; import com.rabbitmq.client.ReturnCallback; @@ -61,6 +64,8 @@ import com.rabbitmq.client.ShutdownSignalException; class TestChannel implements Channel { private final ExecutorService executorService; + private final Map<String, List<Consumer>> consumerMap = new HashMap<>(); + private final Map<String, BlockingQueue<GetResponse>> enqueuedMessages; private final Map<String, List<String>> routingKeyToQueueMappings; private final Map<String, String> exchangeToRoutingKeyMappings; @@ -68,6 +73,9 @@ class TestChannel implements Channel { private boolean open; private boolean corrupted; private Connection connection; + private long deliveryTag = 0L; + private final BitSet acknowledgments = new BitSet(); + private final BitSet nacks = new BitSet(); public TestChannel(Map<String, String> exchangeToRoutingKeyMappings, Map<String, List<String>> routingKeyToQueueMappings) { @@ -232,7 +240,9 @@ class TestChannel implements Channel { if (exchange.equals("")){ // default exchange; routingKey corresponds to a queue. BlockingQueue<GetResponse> messages = this.getMessageQueue(routingKey); - GetResponse response = new GetResponse(null, props, body, messages.size()); + final Envelope envelope = new Envelope(deliveryTag++, false, exchange, routingKey); + + GetResponse response = new GetResponse(envelope, props, body, messages.size()); messages.offer(response); } else { String rKey = this.exchangeToRoutingKeyMappings.get(exchange); @@ -244,8 +254,16 @@ class TestChannel implements Channel { } else { for (String queueName : queueNames) { BlockingQueue<GetResponse> messages = this.getMessageQueue(queueName); - GetResponse response = new GetResponse(null, props, body, messages.size()); + final Envelope envelope = new Envelope(deliveryTag++, false, exchange, routingKey); + GetResponse response = new GetResponse(envelope, props, body, messages.size()); messages.offer(response); + + final List<Consumer> consumers = consumerMap.get(queueName); + if (consumers != null) { + for (final Consumer consumer : consumers) { + consumer.handleDelivery("consumerTag", envelope, props, body); + } + } } } } else { @@ -461,20 +479,25 @@ class TestChannel implements Channel { @Override public void basicAck(long deliveryTag, boolean multiple) throws IOException { - throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + acknowledgments.set((int) deliveryTag); + } + public boolean isAck(final int deliveryTag) { + return acknowledgments.get(deliveryTag); } @Override public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException { - throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + nacks.set((int) deliveryTag); + } + public boolean isNack(final int deliveryTag) { + return nacks.get(deliveryTag); } @Override public void basicReject(long deliveryTag, boolean requeue) throws IOException { - throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); - + nacks.set((int) deliveryTag); } @Override @@ -484,7 +507,21 @@ class TestChannel implements Channel { @Override public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException { - throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + final BlockingQueue<GetResponse> messageQueue = enqueuedMessages.get(queue); + if (messageQueue == null) { + throw new IOException("Queue is not defined"); + } + + consumerMap.computeIfAbsent(queue, q -> new ArrayList<>()).add(callback); + + final String consumerTag = UUID.randomUUID().toString(); + + GetResponse message; + while ((message = messageQueue.poll()) != null) { + callback.handleDelivery(consumerTag, message.getEnvelope(), message.getProps(), message.getBody()); + } + + return consumerTag; } @Override