Repository: nifi
Updated Branches:
  refs/heads/master 18a4819d5 -> e3b0949b6


NIFI-4508: This closes #2784. Update ConsumeAMQP to use basicConsume API 
instead of basicGet in order to provide better performance
NIFI-4508: Added some additional unit tests to verify behavior

Signed-off-by: joewitt <joew...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e3b0949b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e3b0949b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e3b0949b

Branch: refs/heads/master
Commit: e3b0949b6b92d154f96b709ad219408c54b28ee4
Parents: 18a4819
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Jun 8 15:37:17 2018 -0400
Committer: joewitt <joew...@apache.org>
Committed: Mon Jun 11 10:27:28 2018 -0400

----------------------------------------------------------------------
 .../nifi/amqp/processors/AMQPConsumer.java      |  71 +++++++++-
 .../apache/nifi/amqp/processors/AMQPWorker.java |  20 +--
 .../amqp/processors/AbstractAMQPProcessor.java  |   1 +
 .../nifi/amqp/processors/ConsumeAMQP.java       |  99 ++++++++++----
 .../nifi/amqp/processors/AMQPConsumerTest.java  |  39 ++++--
 .../nifi/amqp/processors/ConsumeAMQPTest.java   | 133 ++++++++++++++++++-
 .../nifi/amqp/processors/TestChannel.java       |  51 ++++++-
 7 files changed, 350 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
index 0466469..8872e0c 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
@@ -17,13 +17,20 @@
 package org.apache.nifi.amqp.processors;
 
 import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.nifi.processor.exception.ProcessException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.rabbitmq.client.AMQP.BasicProperties;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.GetResponse;
 
 /**
@@ -34,12 +41,45 @@ final class AMQPConsumer extends AMQPWorker {
 
     private final static Logger logger = 
LoggerFactory.getLogger(AMQPConsumer.class);
     private final String queueName;
+    private final BlockingQueue<GetResponse> responseQueue;
+    private final boolean autoAcknowledge;
+    private final Consumer consumer;
 
-    AMQPConsumer(Connection connection, String queueName) {
+    private volatile boolean closed = false;
+
+
+    AMQPConsumer(final Connection connection, final String queueName, final 
boolean autoAcknowledge) throws IOException {
         super(connection);
         this.validateStringProperty("queueName", queueName);
         this.queueName = queueName;
+        this.autoAcknowledge = autoAcknowledge;
+        this.responseQueue = new LinkedBlockingQueue<>(10);
+
         logger.info("Successfully connected AMQPConsumer to " + 
connection.toString() + " and '" + queueName + "' queue");
+
+        final Channel channel = getChannel();
+        consumer = new DefaultConsumer(channel) {
+            @Override
+            public void handleDelivery(final String consumerTag, final 
Envelope envelope, final BasicProperties properties, final byte[] body) throws 
IOException {
+                if (!autoAcknowledge && closed) {
+                    channel.basicReject(envelope.getDeliveryTag(), true);
+                    return;
+                }
+
+                try {
+                    responseQueue.put(new GetResponse(envelope, properties, 
body, Integer.MAX_VALUE));
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        };
+
+        channel.basicConsume(queueName, autoAcknowledge, consumer);
+    }
+
+    // Visible for unit tests
+    protected Consumer getConsumer() {
+        return consumer;
     }
 
     /**
@@ -53,12 +93,29 @@ final class AMQPConsumer extends AMQPWorker {
      * @return instance of {@link GetResponse}
      */
     public GetResponse consume() {
-        try {
-            return getChannel().basicGet(this.queueName, true);
-        } catch (IOException e) {
-            logger.error("Failed to receive message from AMQP; " + this + ". 
Possible reasons: Queue '" + this.queueName
-                    + "' may not have been defined", e);
-            throw new ProcessException(e);
+        return responseQueue.poll();
+    }
+
+    public void acknowledge(final GetResponse response) throws IOException {
+        if (autoAcknowledge) {
+            return;
+        }
+
+        getChannel().basicAck(response.getEnvelope().getDeliveryTag(), true);
+    }
+
+    @Override
+    public void close() throws TimeoutException, IOException {
+        closed = true;
+
+        GetResponse lastMessage = null;
+        GetResponse response;
+        while ((response = responseQueue.poll()) != null) {
+            lastMessage = response;
+        }
+
+        if (lastMessage != null) {
+            getChannel().basicNack(lastMessage.getEnvelope().getDeliveryTag(), 
true, true);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java
index 990ed0b..d17ea0d 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java
@@ -35,6 +35,7 @@ abstract class AMQPWorker implements AutoCloseable {
 
     private final static Logger logger = 
LoggerFactory.getLogger(AMQPWorker.class);
     private final Channel channel;
+    private boolean closed = false;
 
     /**
      * Creates an instance of this worker initializing it with AMQP
@@ -58,20 +59,21 @@ abstract class AMQPWorker implements AutoCloseable {
         return channel;
     }
 
-    /**
-     * Closes {@link Channel} created when instance of this class was created.
-     */
+
     @Override
     public void close() throws TimeoutException, IOException {
+        if (closed) {
+            return;
+        }
+
         if (logger.isDebugEnabled()) {
             logger.debug("Closing AMQP channel for " + 
this.channel.getConnection().toString());
         }
+
         this.channel.close();
+        closed = true;
     }
 
-    /**
-     *
-     */
     @Override
     public String toString() {
         return this.getClass().getSimpleName() + ":" + 
this.channel.getConnection().toString();
@@ -80,10 +82,8 @@ abstract class AMQPWorker implements AutoCloseable {
     /**
      * Validates that a String property has value (not null nor empty)
      *
-     * @param propertyName
-     *            the name of the property
-     * @param value
-     *            the value of the property
+     * @param propertyName the name of the property
+     * @param value the value of the property
      */
     void validateStringProperty(String propertyName, String value) {
         if (value == null || value.trim().length() == 0) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
index 3e55283..18e176c 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
@@ -142,6 +142,7 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> 
extends AbstractProce
 
     private final BlockingQueue<AMQPResource<T>> resourceQueue = new 
LinkedBlockingQueue<>();
 
+
     /**
      * Will builds target resource ({@link AMQPPublisher} or {@link 
AMQPConsumer}) upon first invocation and will delegate to the
      * implementation of {@link #processResource(ProcessContext, 
ProcessSession)} method for further processing.

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
index 1b0ee52..55a53c7 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.amqp.processors;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,17 +32,19 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import com.rabbitmq.client.AMQP.BasicProperties;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.GetResponse;
 
-@Tags({ "amqp", "rabbit", "get", "message", "receive", "consume" })
+@Tags({"amqp", "rabbit", "get", "message", "receive", "consume"})
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Consumes AMQP Messages from an AMQP Broker using the 
AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be 
"
     + "emitted as its own FlowFile to the 'success' relationship.")
@@ -65,16 +68,36 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
     private static final String ATTRIBUTES_PREFIX = "amqp$";
 
     public static final PropertyDescriptor QUEUE = new 
PropertyDescriptor.Builder()
-            .name("Queue")
-            .description("The name of the existing AMQP Queue from which 
messages will be consumed. Usually pre-defined by AMQP administrator. ")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
+        .name("Queue")
+        .description("The name of the existing AMQP Queue from which messages 
will be consumed. Usually pre-defined by AMQP administrator. ")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+    static final PropertyDescriptor AUTO_ACKNOWLEDGE = new 
PropertyDescriptor.Builder()
+        .name("auto.acknowledge")
+        .displayName("Auto-Acknowledge messages")
+        .description("If true, messages that are received will be 
auto-acknowledged by the AMQP Broker. "
+            + "This generally will provide better throughput but could result 
in messages being lost upon restart of NiFi")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
+    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+        .name("batch.size")
+        .displayName("Batch Size")
+        .description("The maximum number of messages that should be pulled in 
a single session. Once this many messages have been received (or once no more 
messages are readily available), "
+            + "the messages received will be transferred to the 'success' 
relationship and the messages will be acknowledged with the AMQP Broker. 
Setting this value to a larger number "
+            + "could result in better performance, particularly for very small 
messages, but can also result in more messages being duplicated upon sudden 
restart of NiFi.")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .defaultValue("10")
+        .required(true)
+        .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("All FlowFiles that are received from the AMQP queue 
are routed to this relationship")
-            .build();
+        .name("success")
+        .description("All FlowFiles that are received from the AMQP queue are 
routed to this relationship")
+        .build();
 
     private static final List<PropertyDescriptor> propertyDescriptors;
     private static final Set<Relationship> relationships;
@@ -82,6 +105,8 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
     static {
         List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(QUEUE);
+        properties.add(AUTO_ACKNOWLEDGE);
+        properties.add(BATCH_SIZE);
         properties.addAll(getCommonPropertyDescriptors());
         propertyDescriptors = Collections.unmodifiableList(properties);
 
@@ -97,21 +122,40 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
      */
     @Override
     protected void processResource(final Connection connection, final 
AMQPConsumer consumer, final ProcessContext context, final ProcessSession 
session) {
-        final GetResponse response = consumer.consume();
-        if (response == null) {
-            context.yield();
-            return;
-        }
+        GetResponse lastReceived = null;
+
+        for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) {
+            final GetResponse response = consumer.consume();
+            if (response == null) {
+                if (lastReceived == null) {
+                    // If no messages received, then yield.
+                    context.yield();
+                }
 
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, out -> 
out.write(response.getBody()));
+                break;
+            }
 
-        final BasicProperties amqpProperties = response.getProps();
-        final Map<String, String> attributes = buildAttributes(amqpProperties);
-        flowFile = session.putAllAttributes(flowFile, attributes);
+            FlowFile flowFile = session.create();
+            flowFile = session.write(flowFile, out -> 
out.write(response.getBody()));
 
-        session.getProvenanceReporter().receive(flowFile, 
connection.toString() + "/" + context.getProperty(QUEUE).getValue());
-        session.transfer(flowFile, REL_SUCCESS);
+            final BasicProperties amqpProperties = response.getProps();
+            final Map<String, String> attributes = 
buildAttributes(amqpProperties);
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            session.getProvenanceReporter().receive(flowFile, 
connection.toString() + "/" + context.getProperty(QUEUE).getValue());
+            session.transfer(flowFile, REL_SUCCESS);
+            lastReceived = response;
+        }
+
+        session.commit();
+
+        if (lastReceived != null) {
+            try {
+                consumer.acknowledge(lastReceived);
+            } catch (IOException e) {
+                throw new ProcessException("Failed to acknowledge message", e);
+            }
+        }
     }
 
     private Map<String, String> buildAttributes(final BasicProperties 
properties) {
@@ -142,9 +186,16 @@ public class ConsumeAMQP extends 
AbstractAMQPProcessor<AMQPConsumer> {
     }
 
     @Override
-    protected AMQPConsumer createAMQPWorker(final ProcessContext context, 
final Connection connection) {
-        final String queueName = context.getProperty(QUEUE).getValue();
-        return new AMQPConsumer(connection, queueName);
+    protected synchronized AMQPConsumer createAMQPWorker(final ProcessContext 
context, final Connection connection) {
+        try {
+            final String queueName = context.getProperty(QUEUE).getValue();
+            final boolean autoAcknowledge = 
context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
+            final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, 
queueName, autoAcknowledge);
+
+            return amqpConsumer;
+        } catch (final IOException ioe) {
+            throw new ProcessException("Failed to connect to AMQP Broker", 
ioe);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
index 75fe716..87278fd 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
@@ -18,44 +18,59 @@ package org.apache.nifi.amqp.processors;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeoutException;
 
-import org.apache.nifi.processor.exception.ProcessException;
 import org.junit.Test;
 
+import com.rabbitmq.client.AMQP.BasicProperties;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.GetResponse;
 
 public class AMQPConsumerTest {
 
-    @SuppressWarnings("resource")
+
+    @Test
+    public void testUnconsumedMessagesNacked() throws TimeoutException, 
IOException {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+
+        final TestConnection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+        final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", 
true);
+        consumer.getChannel().basicPublish("myExchange", "key1", new 
BasicProperties(), new byte[0]);
+
+        consumer.close();
+        assertTrue(((TestChannel) consumer.getChannel()).isNack(0));
+    }
+
     @Test(expected = IllegalArgumentException.class)
-    public void failOnNullConnection() {
-        new AMQPConsumer(null, null);
+    public void failOnNullConnection() throws IOException {
+        new AMQPConsumer(null, null, true);
     }
 
-    @SuppressWarnings("resource")
     @Test(expected = IllegalArgumentException.class)
     public void failOnNullQueueName() throws Exception {
         Connection conn = new TestConnection(null, null);
-        new AMQPConsumer(conn, null);
+        new AMQPConsumer(conn, null, true);
     }
 
-    @SuppressWarnings("resource")
     @Test(expected = IllegalArgumentException.class)
     public void failOnEmptyQueueName() throws Exception {
         Connection conn = new TestConnection(null, null);
-        new AMQPConsumer(conn, " ");
+        new AMQPConsumer(conn, " ", true);
     }
 
-    @Test(expected = ProcessException.class)
+    @Test(expected = IOException.class)
     public void failOnNonExistingQueue() throws Exception {
         Connection conn = new TestConnection(null, null);
-        try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello")) {
+        try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true)) {
             consumer.consume();
         }
     }
@@ -68,7 +83,7 @@ public class AMQPConsumerTest {
         exchangeToRoutingKeymap.put("", "queue1");
 
         Connection conn = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
-        try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1")) {
+        try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true)) {
             GetResponse response = consumer.consume();
             assertNull(response);
         }
@@ -83,7 +98,7 @@ public class AMQPConsumerTest {
 
         Connection conn = new TestConnection(exchangeToRoutingKeymap, 
routingMap);
         conn.createChannel().basicPublish("myExchange", "key1", null, "hello 
Joe".getBytes());
-        try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1")) {
+        try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true)) {
             GetResponse response = consumer.consume();
             assertNotNull(response);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 66abb2d..6374fa6 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -16,26 +16,138 @@
  */
 package org.apache.nifi.amqp.processors;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
+import com.rabbitmq.client.AMQP.BasicProperties;
 import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.MessageProperties;
 
 public class ConsumeAMQPTest {
 
+    @Test
+    public void testMessageAcked() throws TimeoutException, IOException {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), 
MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
+            sender.publish("world".getBytes(), 
MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = TestRunners.newTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.HOST, "injvm");
+            runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
+            runner.setProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE, "false");
+
+            runner.run();
+
+            runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 2);
+
+            final MockFlowFile helloFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            helloFF.assertContentEquals("hello");
+
+            final MockFlowFile worldFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(1);
+            worldFF.assertContentEquals("world");
+
+            // A single cumulative ack should be used
+            assertFalse(((TestChannel) connection.createChannel()).isAck(0));
+            assertTrue(((TestChannel) connection.createChannel()).isAck(1));
+        }
+    }
+
+    @Test
+    public void testBatchSizeAffectsAcks() throws TimeoutException, 
IOException {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), 
MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
+            sender.publish("world".getBytes(), 
MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
+
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = TestRunners.newTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.HOST, "injvm");
+            runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
+            runner.setProperty(ConsumeAMQP.BATCH_SIZE, "1");
+
+            runner.run(2);
+
+            runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 2);
+
+            final MockFlowFile helloFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            helloFF.assertContentEquals("hello");
+
+            final MockFlowFile worldFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(1);
+            worldFF.assertContentEquals("world");
+
+            // A single cumulative ack should be used
+            assertTrue(((TestChannel) connection.createChannel()).isAck(0));
+            assertTrue(((TestChannel) connection.createChannel()).isAck(1));
+        }
+    }
+
+    @Test
+    public void testMessagesRejectedOnStop() throws TimeoutException, 
IOException {
+        final Map<String, List<String>> routingMap = 
Collections.singletonMap("key1", Arrays.asList("queue1"));
+        final Map<String, String> exchangeToRoutingKeymap = 
Collections.singletonMap("myExchange", "key1");
+
+        final Connection connection = new 
TestConnection(exchangeToRoutingKeymap, routingMap);
+
+        try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
+            sender.publish("hello".getBytes(), 
MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
+            sender.publish("world".getBytes(), 
MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
+            sender.publish("good-bye".getBytes(), 
MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
+
+            LocalConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = TestRunners.newTestRunner(proc);
+            runner.setProperty(ConsumeAMQP.HOST, "injvm");
+            runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
+            runner.setProperty(ConsumeAMQP.BATCH_SIZE, "1");
+
+            runner.run();
+            proc.close();
+
+            runner.assertTransferCount(PublishAMQP.REL_SUCCESS, 1);
+
+            final MockFlowFile helloFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
+            helloFF.assertContentEquals("hello");
+
+
+            // A single cumulative ack should be used
+            assertTrue(((TestChannel) connection.createChannel()).isAck(0));
+
+            // Messages 1 and 2 will have been delivered but on stop should be 
rejected. They will be rejected
+            // cumulatively, though, so only delivery Tag 2 will be nack'ed 
explicitly
+            assertTrue(((TestChannel) connection.createChannel()).isNack(2));
+
+            // Any newly delivered messages should also be immediately nack'ed.
+            proc.getAMQPWorker().getConsumer().handleDelivery("123", new 
Envelope(3, false, "myExchange", "key1"), new BasicProperties(), new byte[0]);
+            assertTrue(((TestChannel) connection.createChannel()).isNack(3));
+        }
+    }
 
     @Test
     public void validateSuccessfullConsumeAndTransferToSuccess() throws 
Exception {
@@ -47,8 +159,8 @@ public class ConsumeAMQPTest {
         try (AMQPPublisher sender = new AMQPPublisher(connection, 
mock(ComponentLog.class))) {
             sender.publish("hello".getBytes(), 
MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
 
-            ConsumeAMQP pubProc = new LocalConsumeAMQP(connection);
-            TestRunner runner = TestRunners.newTestRunner(pubProc);
+            ConsumeAMQP proc = new LocalConsumeAMQP(connection);
+            TestRunner runner = TestRunners.newTestRunner(proc);
             runner.setProperty(ConsumeAMQP.HOST, "injvm");
             runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
 
@@ -56,11 +168,11 @@ public class ConsumeAMQPTest {
             final MockFlowFile successFF = 
runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
             assertNotNull(successFF);
         }
-
     }
 
     public static class LocalConsumeAMQP extends ConsumeAMQP {
         private final Connection connection;
+        private AMQPConsumer consumer;
 
         public LocalConsumeAMQP(Connection connection) {
             this.connection = connection;
@@ -68,7 +180,20 @@ public class ConsumeAMQPTest {
 
         @Override
         protected AMQPConsumer createAMQPWorker(ProcessContext context, 
Connection connection) {
-            return new AMQPConsumer(connection, 
context.getProperty(QUEUE).getValue());
+            try {
+                if (consumer != null) {
+                    throw new IllegalStateException("Consumer already 
created");
+                }
+
+                consumer = new AMQPConsumer(connection, 
context.getProperty(QUEUE).getValue(), 
context.getProperty(AUTO_ACKNOWLEDGE).asBoolean());
+                return consumer;
+            } catch (IOException e) {
+                throw new ProcessException(e);
+            }
+        }
+
+        public AMQPConsumer getAMQPWorker() {
+            return consumer;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3b0949b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
index 4ed4eeb..1011f62 100644
--- 
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
+++ 
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
@@ -18,9 +18,11 @@ package org.apache.nifi.amqp.processors;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -48,6 +50,7 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Consumer;
 import com.rabbitmq.client.ConsumerShutdownSignalCallback;
 import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.GetResponse;
 import com.rabbitmq.client.Method;
 import com.rabbitmq.client.ReturnCallback;
@@ -61,6 +64,8 @@ import com.rabbitmq.client.ShutdownSignalException;
 class TestChannel implements Channel {
 
     private final ExecutorService executorService;
+    private final Map<String, List<Consumer>> consumerMap = new HashMap<>();
+
     private final Map<String, BlockingQueue<GetResponse>> enqueuedMessages;
     private final Map<String, List<String>> routingKeyToQueueMappings;
     private final Map<String, String> exchangeToRoutingKeyMappings;
@@ -68,6 +73,9 @@ class TestChannel implements Channel {
     private boolean open;
     private boolean corrupted;
     private Connection connection;
+    private long deliveryTag = 0L;
+    private final BitSet acknowledgments = new BitSet();
+    private final BitSet nacks = new BitSet();
 
     public TestChannel(Map<String, String> exchangeToRoutingKeyMappings,
             Map<String, List<String>> routingKeyToQueueMappings) {
@@ -232,7 +240,9 @@ class TestChannel implements Channel {
 
         if (exchange.equals("")){ // default exchange; routingKey corresponds 
to a queue.
             BlockingQueue<GetResponse> messages = 
this.getMessageQueue(routingKey);
-            GetResponse response = new GetResponse(null, props, body, 
messages.size());
+            final Envelope envelope = new Envelope(deliveryTag++, false, 
exchange, routingKey);
+
+            GetResponse response = new GetResponse(envelope, props, body, 
messages.size());
             messages.offer(response);
         } else {
             String rKey = this.exchangeToRoutingKeyMappings.get(exchange);
@@ -244,8 +254,16 @@ class TestChannel implements Channel {
                 } else {
                     for (String queueName : queueNames) {
                         BlockingQueue<GetResponse> messages = 
this.getMessageQueue(queueName);
-                        GetResponse response = new GetResponse(null, props, 
body, messages.size());
+                        final Envelope envelope = new Envelope(deliveryTag++, 
false, exchange, routingKey);
+                        GetResponse response = new GetResponse(envelope, 
props, body, messages.size());
                         messages.offer(response);
+
+                        final List<Consumer> consumers = 
consumerMap.get(queueName);
+                        if (consumers != null) {
+                            for (final Consumer consumer : consumers) {
+                                consumer.handleDelivery("consumerTag", 
envelope, props, body);
+                            }
+                        }
                     }
                 }
             } else {
@@ -461,20 +479,25 @@ class TestChannel implements Channel {
 
     @Override
     public void basicAck(long deliveryTag, boolean multiple) throws 
IOException {
-        throw new UnsupportedOperationException("This method is not currently 
supported as it is not used by current API in testing");
+        acknowledgments.set((int) deliveryTag);
+    }
 
+    public boolean isAck(final int deliveryTag) {
+        return acknowledgments.get(deliveryTag);
     }
 
     @Override
     public void basicNack(long deliveryTag, boolean multiple, boolean requeue) 
throws IOException {
-        throw new UnsupportedOperationException("This method is not currently 
supported as it is not used by current API in testing");
+        nacks.set((int) deliveryTag);
+    }
 
+    public boolean isNack(final int deliveryTag) {
+        return nacks.get(deliveryTag);
     }
 
     @Override
     public void basicReject(long deliveryTag, boolean requeue) throws 
IOException {
-        throw new UnsupportedOperationException("This method is not currently 
supported as it is not used by current API in testing");
-
+        nacks.set((int) deliveryTag);
     }
 
     @Override
@@ -484,7 +507,21 @@ class TestChannel implements Channel {
 
     @Override
     public String basicConsume(String queue, boolean autoAck, Consumer 
callback) throws IOException {
-        throw new UnsupportedOperationException("This method is not currently 
supported as it is not used by current API in testing");
+        final BlockingQueue<GetResponse> messageQueue = 
enqueuedMessages.get(queue);
+        if (messageQueue == null) {
+            throw new IOException("Queue is not defined");
+        }
+
+        consumerMap.computeIfAbsent(queue, q -> new 
ArrayList<>()).add(callback);
+
+        final String consumerTag = UUID.randomUUID().toString();
+
+        GetResponse message;
+        while ((message = messageQueue.poll()) != null) {
+            callback.handleDelivery(consumerTag, message.getEnvelope(), 
message.getProps(), message.getBody());
+        }
+
+        return consumerTag;
     }
 
     @Override

Reply via email to