Repository: nifi
Updated Branches:
  refs/heads/master 120d2100a -> 9b647cd53


NIFI-1148 added IMAP/POP3 support added initial set of processors to support 
consumption of Email via IMAP/IMAPS and POP3 protocols

Signed-off-by: Matt Burgess <mattyb...@apache.org>

NIFI-1148 addressed PR comments from @trixpan

NIFI-1148 addressing PR comments

NIFI-1148 addressed PR comments

This closes #710


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9b647cd5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9b647cd5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9b647cd5

Branch: refs/heads/master
Commit: 9b647cd53842e442e294dc62033c8e6df6078b81
Parents: 120d210
Author: Oleg Zhurakousky <o...@suitcase.io>
Authored: Sat Jul 23 10:42:40 2016 -0400
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Mon Aug 8 11:57:28 2016 -0400

----------------------------------------------------------------------
 .../nifi-email-processors/pom.xml               | 145 ++++---
 .../email/AbstractEmailProcessor.java           | 407 +++++++++++++++++++
 .../nifi/processors/email/ConsumeIMAP.java      |  91 +++++
 .../nifi/processors/email/ConsumePOP3.java      |  68 ++++
 .../org.apache.nifi.processor.Processor         |   2 +
 .../additionalDetails.html                      |  58 +++
 .../additionalDetails.html                      |  57 +++
 .../nifi/processors/email/ConsumeEmailTest.java | 135 ++++++
 8 files changed, 896 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
index 655ac86..75cd3d2 100644
--- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml
@@ -1,73 +1,84 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
-     license agreements. See the NOTICE file distributed with this work for 
additional
-     information regarding copyright ownership. The ASF licenses this file to
-     You under the Apache License, Version 2.0 (the "License"); you may not use
-     this file except in compliance with the License. You may obtain a copy of
-     the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
-     by applicable law or agreed to in writing, software distributed under the
-     License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS
-     OF ANY KIND, either express or implied. See the License for the specific
-     language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-        <modelVersion>4.0.0</modelVersion>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+       license agreements. See the NOTICE file distributed with this work for 
additional 
+       information regarding copyright ownership. The ASF licenses this file 
to 
+       You under the Apache License, Version 2.0 (the "License"); you may not 
use 
+       this file except in compliance with the License. You may obtain a copy 
of 
+       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required 
+       by applicable law or agreed to in writing, software distributed under 
the 
+       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
+       OF ANY KIND, either express or implied. See the License for the 
specific 
+       language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
 
-        <parent>
-                <groupId>org.apache.nifi</groupId>
-                <artifactId>nifi-email-bundle</artifactId>
-                <version>1.0.0-SNAPSHOT</version>
-        </parent>
+       <parent>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-email-bundle</artifactId>
+               <version>1.0.0-SNAPSHOT</version>
+       </parent>
 
-        <artifactId>nifi-email-processors</artifactId>
-        <packaging>jar</packaging>
+       <artifactId>nifi-email-processors</artifactId>
+       <packaging>jar</packaging>
 
-        <dependencies>
-                <dependency>
-                        <groupId>org.apache.nifi</groupId>
-                        <artifactId>nifi-api</artifactId>
-                </dependency>
-                <dependency>
-                        <groupId>org.apache.nifi</groupId>
-                        <artifactId>nifi-processor-utils</artifactId>
-                </dependency>
-                <dependency>
-                        <groupId>javax.mail</groupId>
-                        <artifactId>mail</artifactId>
-                </dependency>
-                <dependency>
-                        <groupId>org.apache.commons</groupId>
-                        <artifactId>commons-email</artifactId>
-                        <version>1.4</version>
-                </dependency>
-                <dependency>
-                    <groupId>org.subethamail</groupId>
-                    <artifactId>subethasmtp</artifactId>
-                    <version>3.1.7</version>
-                </dependency>
-                <dependency>
-                    <groupId>org.apache.nifi</groupId>
-                    <artifactId>nifi-ssl-context-service-api</artifactId>
-                </dependency>
-            <dependency>
-                <groupId>org.apache.nifi</groupId>
-                <artifactId>nifi-ssl-context-service</artifactId>
-                <scope>test</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.nifi</groupId>
-                <artifactId>nifi-mock</artifactId>
-                <scope>test</scope>
-            </dependency>
-        </dependencies>
-        <build>
-            <plugins>
-                <plugin>
-                    <groupId>org.apache.rat</groupId>
-                    <artifactId>apache-rat-plugin</artifactId>
-                    <configuration>
-                    </configuration>
-                </plugin>
-            </plugins>
-        </build>
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-api</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-processor-utils</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>javax.mail</groupId>
+                       <artifactId>mail</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.commons</groupId>
+                       <artifactId>commons-email</artifactId>
+                       <version>1.4</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.subethamail</groupId>
+                       <artifactId>subethasmtp</artifactId>
+                       <version>3.1.7</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-ssl-context-service-api</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.springframework.integration</groupId>
+                       <artifactId>spring-integration-mail</artifactId>
+                       <version>4.3.0.RELEASE</version>
+               </dependency>
+               <dependency>
+                       <groupId>commons-logging</groupId>
+                       <artifactId>commons-logging</artifactId>
+                       <version>1.2</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-ssl-context-service</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-mock</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.rat</groupId>
+                               <artifactId>apache-rat-plugin</artifactId>
+                               <configuration>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
 </project>
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
new file mode 100644
index 0000000..e7fd27c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.email;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.mail.Address;
+import javax.mail.Flags;
+import javax.mail.Message;
+import javax.mail.MessagingException;
+
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.support.StaticListableBeanFactory;
+import org.springframework.integration.mail.AbstractMailReceiver;
+import org.springframework.util.Assert;
+import org.springframework.util.StreamUtils;
+
+/**
+ * Base processor for implementing processors to consume messages from Email
+ * servers using Spring Integration libraries.
+ *
+ * @param <T>
+ *            the type of {@link AbstractMailReceiver}.
+ */
+abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends 
AbstractProcessor {
+
+    public static final PropertyDescriptor HOST = new 
PropertyDescriptor.Builder()
+            .name("host")
+            .displayName("Host Name")
+            .description("Network address of Email server (e.g., 
pop.gmail.com, imap.gmail.com . . .)")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PORT = new 
PropertyDescriptor.Builder()
+            .name("port")
+            .displayName("Port")
+            .description("Numeric value identifying Port of Email server 
(e.g., 993)")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor USER = new 
PropertyDescriptor.Builder()
+            .name("user")
+            .displayName("User Name")
+            .description("User Name used for authentication and authorization 
with Email server.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("password")
+            .displayName("Password")
+            .description("Password used for authentication and authorization 
with Email server.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+    public static final PropertyDescriptor FOLDER = new 
PropertyDescriptor.Builder()
+            .name("folder")
+            .displayName("Folder")
+            .description("Email folder to retrieve messages from (e.g., 
INBOX)")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .defaultValue("INBOX")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor FETCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("fetch.size")
+            .displayName("Fetch Size")
+            .description("Specify the maximum number of Messages to fetch per 
call to Email Server.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .defaultValue("10")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor SHOULD_DELETE_MESSAGES = new 
PropertyDescriptor.Builder()
+            .name("delete.messages")
+            .displayName("Delete Messages")
+            .description("Specify whether mail messages should be deleted 
after retrieval.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All messages that are the are successfully received 
from Email server and converted to FlowFiles are routed to this relationship")
+            .build();
+
+    static List<PropertyDescriptor> SHARED_DESCRIPTORS = new ArrayList<>();
+
+    static Set<Relationship> SHARED_RELATIONSHIPS = new HashSet<>();
+
+    /*
+     * Will ensure that list of PropertyDescriptors is build only once, since
+     * all other lifecycle methods are invoked multiple times.
+     */
+    static {
+        SHARED_DESCRIPTORS.add(HOST);
+        SHARED_DESCRIPTORS.add(PORT);
+        SHARED_DESCRIPTORS.add(USER);
+        SHARED_DESCRIPTORS.add(PASSWORD);
+        SHARED_DESCRIPTORS.add(FOLDER);
+        SHARED_DESCRIPTORS.add(FETCH_SIZE);
+        SHARED_DESCRIPTORS.add(SHOULD_DELETE_MESSAGES);
+
+        SHARED_RELATIONSHIPS.add(REL_SUCCESS);
+    }
+
+    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    protected volatile T messageReceiver;
+
+    private volatile BlockingQueue<Message> messageQueue;
+
+    private volatile String displayUrl;
+
+    private volatile ProcessSession processSession;
+
+    private volatile boolean shouldSetDeleteFlag;
+
+    @OnStopped
+    public void stop(ProcessContext processContext) {
+        this.flushRemainingMessages(processContext);
+        try {
+            this.messageReceiver.destroy();
+            this.messageReceiver = null;
+        } catch (Exception e) {
+            this.logger.warn("Failure while closing processor", e);
+        }
+    }
+
+    /**
+     *
+     */
+    @Override
+    public Set<Relationship> getRelationships() {
+        return SHARED_RELATIONSHIPS;
+    }
+
+    /**
+     *
+     */
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession 
processSession) throws ProcessException {
+        this.initializeIfNecessary(context, processSession);
+
+        Message emailMessage = this.receiveMessage();
+        if (emailMessage != null) {
+            this.transfer(emailMessage, context, processSession);
+        }
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + 
propertyDescriptorName + "' Java Mail property.")
+                
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
+                .build();
+    }
+
+    /**
+     * Delegates to sub-classes to build the target receiver as
+     * {@link AbstractMailReceiver}
+     *
+     * @param context
+     *            instance of {@link ProcessContext}
+     * @return new instance of {@link AbstractMailReceiver}
+     */
+    protected abstract T buildMessageReceiver(ProcessContext context);
+
+    /**
+     * Return the target receivere's mail protocol (e.g., imap, pop etc.)
+     */
+    protected abstract String getProtocol(ProcessContext processContext);
+
+    /**
+     * Builds the url used to connect to the email server.
+     */
+    String buildUrl(ProcessContext processContext) {
+        String host = 
processContext.getProperty(HOST).evaluateAttributeExpressions().getValue();
+        String port = 
processContext.getProperty(PORT).evaluateAttributeExpressions().getValue();
+        String user = 
processContext.getProperty(USER).evaluateAttributeExpressions().getValue();
+        String password = 
processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+        String folder = 
processContext.getProperty(FOLDER).evaluateAttributeExpressions().getValue();
+
+        StringBuilder urlBuilder = new StringBuilder();
+        try {
+            urlBuilder.append(URLEncoder.encode(user, "UTF-8"));
+        } catch (UnsupportedEncodingException e) {
+            throw new ProcessException(e);
+        }
+        urlBuilder.append(":");
+        try {
+            urlBuilder.append(URLEncoder.encode(password, "UTF-8"));
+        } catch (UnsupportedEncodingException e) {
+            throw new ProcessException(e);
+        }
+        urlBuilder.append("@");
+        urlBuilder.append(host);
+        urlBuilder.append(":");
+        urlBuilder.append(port);
+        urlBuilder.append("/");
+        urlBuilder.append(folder);
+
+        String protocol = this.getProtocol(processContext);
+        String finalUrl = protocol + "://" + urlBuilder.toString();
+
+        // build display-safe URL
+        int passwordStartIndex = urlBuilder.indexOf(":") + 1;
+        int passwordEndIndex = urlBuilder.indexOf("@");
+        urlBuilder.replace(passwordStartIndex, passwordEndIndex, "[password]");
+        this.displayUrl = protocol + "://" + urlBuilder.toString();
+        if (this.logger.isInfoEnabled()) {
+            this.logger.info("Connecting to Email server at the following URL: 
" + this.displayUrl);
+        }
+
+        return finalUrl;
+    }
+
+    /**
+     * Builds and initializes the target message receiver if necessary (if it's
+     * null). Upon execution of this operation the receiver is fully functional
+     * and is ready to receive messages.
+     */
+    private synchronized void initializeIfNecessary(ProcessContext context, 
ProcessSession processSession) {
+        if (this.messageReceiver == null) {
+            this.processSession = processSession;
+            this.messageReceiver = this.buildMessageReceiver(context);
+
+            this.shouldSetDeleteFlag = 
context.getProperty(SHOULD_DELETE_MESSAGES).asBoolean();
+            int fetchSize = 
context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
+
+            this.messageReceiver.setMaxFetchSize(fetchSize);
+            
this.messageReceiver.setJavaMailProperties(this.buildJavaMailProperties(context));
+            // need to avoid spring warning messages
+            this.messageReceiver.setBeanFactory(new 
StaticListableBeanFactory());
+            this.messageReceiver.afterPropertiesSet();
+
+            this.messageQueue = new ArrayBlockingQueue<>(fetchSize);
+        }
+    }
+
+    /**
+     * Extracts dynamic properties which typically represent the Java Mail
+     * properties from the {@link ProcessContext} returnining them as instance
+     * of {@link Properties}
+     */
+    private Properties buildJavaMailProperties(ProcessContext context) {
+        Properties javaMailProperties = new Properties();
+        for (Entry<PropertyDescriptor, String> propertyDescriptorEntry : 
context.getProperties().entrySet()) {
+            if (propertyDescriptorEntry.getKey().isDynamic()) {
+                
javaMailProperties.setProperty(propertyDescriptorEntry.getKey().getName(),
+                        propertyDescriptorEntry.getValue());
+            }
+        }
+        return javaMailProperties;
+    }
+
+    /**
+     * Fills the internal message queue if such queue is empty. This is due to
+     * the fact that per single session there may be multiple messages 
retrieved
+     * from the email server (see FETCH_SIZE).
+     */
+    private synchronized void fillMessageQueueIfNecessary() {
+        if (this.messageQueue.isEmpty()) {
+            Object[] messages;
+            try {
+                messages = this.messageReceiver.receive();
+            } catch (MessagingException e) {
+                String errorMsg = "Failed to receive messages from Email 
server: [" + e.getClass().getName()
+                        + " - " + e.getMessage();
+                this.getLogger().error(errorMsg);
+                throw new ProcessException(errorMsg, e);
+            }
+
+            if (messages != null) {
+                for (Object message : messages) {
+                    Assert.isTrue(message instanceof Message, "Message is not 
an instance of javax.mail.Message");
+                    this.messageQueue.offer((Message) message);
+                }
+            }
+        }
+    }
+
+    /**
+     * Disposes the message by converting it to a {@link FlowFile} transferring
+     * it to the REL_SUCCESS relationship.
+     */
+    private void transfer(Message emailMessage, ProcessContext context, 
ProcessSession processSession) {
+        long start = System.nanoTime();
+        FlowFile flowFile = processSession.create();
+
+        flowFile = processSession.append(flowFile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream out) throws IOException {
+                try {
+                    StreamUtils.copy(emailMessage.getInputStream(), out);
+                } catch (MessagingException e) {
+                    throw new IOException(e);
+                }
+            }
+        });
+
+        long executionDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+
+        String fromAddressesString = "";
+        try {
+            Address[] fromAddresses = emailMessage.getFrom();
+            if (fromAddresses != null) {
+                fromAddressesString = Arrays.asList(fromAddresses).toString();
+            }
+        } catch (MessagingException e) {
+            this.logger.warn("Faild to retrieve 'From' attribute from 
Message.");
+        }
+
+        processSession.getProvenanceReporter().receive(flowFile, 
this.displayUrl, "Received message from " + fromAddressesString, 
executionDuration);
+        this.getLogger().info("Successfully received {} from {} in {} millis", 
new Object[] { flowFile, fromAddressesString, executionDuration });
+        processSession.transfer(flowFile, REL_SUCCESS);
+
+        try {
+            emailMessage.setFlag(Flags.Flag.DELETED, this.shouldSetDeleteFlag);
+        } catch (MessagingException e) {
+            this.logger.warn("Failed to set DELETE Flag on the message", e);
+            this.getLogger().warn("Failed to set DELETE Flag on the message");
+        }
+    }
+
+    /**
+     * Receives message from the internal queue filling up the queue if
+     * necessary.
+     */
+    private Message receiveMessage() {
+        Message emailMessage = null;
+        try {
+            this.fillMessageQueueIfNecessary();
+            emailMessage = this.messageQueue.poll(1, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            this.logger.debug("Current thread is interrupted");
+        }
+        return emailMessage;
+    }
+
+    /**
+     * Will flush the remaining messages when this processor is stopped. The
+     * flushed messages are disposed via
+     * {@link #disposeMessage(Message, ProcessContext, ProcessSession)}
+     * operation
+     */
+    private void flushRemainingMessages(ProcessContext processContext) {
+        Message emailMessage;
+        try {
+            while ((emailMessage = this.messageQueue.poll(1, 
TimeUnit.MILLISECONDS)) != null) {
+                this.transfer(emailMessage, processContext, 
this.processSession);
+                this.processSession.commit();
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            this.logger.debug("Current thread is interrupted");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumeIMAP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumeIMAP.java
 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumeIMAP.java
new file mode 100644
index 0000000..62e6f5d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumeIMAP.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.email;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.springframework.integration.mail.ImapMailReceiver;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Consumes messages from Email Server using IMAP 
protocol. "
+        + "The raw-bytes of each received email message are written as 
contents of the FlowFile")
+@Tags({ "Email", "Imap", "Get", "Ingest", "Ingress", "Message", "Consume" })
+public class ConsumeIMAP extends AbstractEmailProcessor<ImapMailReceiver> {
+
+    public static final PropertyDescriptor SHOULD_MARK_READ = new 
PropertyDescriptor.Builder()
+            .name("Mark Messages as Read")
+            .description("Specify if messages should be marked as read after 
retrieval.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor USE_SSL = new 
PropertyDescriptor.Builder()
+            .name("Use SSL")
+            .description("Specifies if IMAP connection must be obtained via 
SSL encrypted connection (i.e., IMAPS)")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    static final List<PropertyDescriptor> DESCRIPTORS;
+
+    static {
+        List<PropertyDescriptor> _descriptors = new ArrayList<>();
+        _descriptors.addAll(SHARED_DESCRIPTORS);
+        _descriptors.add(SHOULD_MARK_READ);
+        _descriptors.add(USE_SSL);
+        DESCRIPTORS = Collections.unmodifiableList(_descriptors);
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected ImapMailReceiver buildMessageReceiver(ProcessContext 
processContext) {
+        ImapMailReceiver receiver = new 
ImapMailReceiver(this.buildUrl(processContext));
+        boolean shouldMarkAsRead = 
processContext.getProperty(SHOULD_MARK_READ).asBoolean();
+        receiver.setShouldMarkMessagesAsRead(shouldMarkAsRead);
+        return receiver;
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected String getProtocol(ProcessContext processContext) {
+        return processContext.getProperty(USE_SSL).asBoolean() ? "imaps" : 
"imap";
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumePOP3.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumePOP3.java
 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumePOP3.java
new file mode 100644
index 0000000..6deab39
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumePOP3.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.email;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.springframework.integration.mail.Pop3MailReceiver;
+
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@CapabilityDescription("Consumes messages from Email Server using POP3 
protocol. "
+        + "The raw-bytes of each received email message are written as 
contents of the FlowFile")
+@Tags({ "Email", "POP3", "Get", "Ingest", "Ingress", "Message", "Consume" })
+public class ConsumePOP3 extends AbstractEmailProcessor<Pop3MailReceiver> {
+
+    static final List<PropertyDescriptor> DESCRIPTORS;
+
+    static {
+        List<PropertyDescriptor> _descriptors = new ArrayList<>();
+        _descriptors.addAll(SHARED_DESCRIPTORS);
+        DESCRIPTORS = Collections.unmodifiableList(_descriptors);
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected String getProtocol(ProcessContext processContext) {
+        return "pop3";
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected Pop3MailReceiver buildMessageReceiver(ProcessContext context) {
+        return new Pop3MailReceiver(this.buildUrl(context));
+    }
+
+    /**
+     *
+     */
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 7a1f644..c6bf6d8 100644
--- 
a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,3 +15,5 @@
 org.apache.nifi.processors.email.ExtractEmailAttachments
 org.apache.nifi.processors.email.ExtractEmailHeaders
 org.apache.nifi.processors.email.ListenSMTP
+org.apache.nifi.processors.email.ConsumeIMAP
+org.apache.nifi.processors.email.ConsumePOP3

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumeIMAP/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumeIMAP/additionalDetails.html
 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumeIMAP/additionalDetails.html
new file mode 100644
index 0000000..3be9a84
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumeIMAP/additionalDetails.html
@@ -0,0 +1,58 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+<head>
+<meta charset="utf-8" />
+<title>ConsumeIMAP</title>
+<link rel="stylesheet" href="../../css/component-usage.css"
+       type="text/css" />
+</head>
+
+<body>
+       <!-- Processor Documentation 
================================================== -->
+       <h2>Description:</h2>
+       <p>This Processor consumes email messages via IMAP protocol and sends 
the content of an email message as content of the Flow File. 
+          Content of the incoming email message is written as raw bytes to the 
content of the outgoing Flow File.
+    </p>
+
+       <p>Different email providers may require additional Java Mail 
properties which could be provided as dynamic properties.
+       For example, below is a sample configuration for GMail:
+
+       </p>
+       <p>
+               <b>Processor's static properties:</b>
+               <ul>
+                       <li><b>Host Name</b> - imap.gmail.com</li>
+                       <li><b>Port</b> - 993</li>
+                       <li><b>User Name</b> - <i>[your user name]</i></li>
+                       <li><b>Password</b> - <i>[your password]</i></li>
+                       <li><b>Folder</b> - INBOX</li>
+               </ul>
+               <b>Processor's dynamic properties:</b>
+               <ul>
+                       <li><b>mail.imap.socketFactory.class</b> - 
javax.net.ssl.SSLSocketFactory</li>
+                       <li><b>mail.imap.socketFactory.fallback</b> - false</li>
+                       <li><b>mail.store.protocol</b> - imaps</li>
+               </ul>
+       </p>
+       <p>
+       Another useful property is <b>mail.debug</b> which allows Java Mail API 
to print protocol messages to the console helping you to both understand what's 
going on as well as debug issues.
+       </p>
+       <p>
+       For the full list of available Java Mail properties please refer to <a 
href="http://connector.sourceforge.net/doc-files/Properties.html";>here</a>
+       </p>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumePOP3/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumePOP3/additionalDetails.html
 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumePOP3/additionalDetails.html
new file mode 100644
index 0000000..55a2220
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumePOP3/additionalDetails.html
@@ -0,0 +1,57 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+<head>
+<meta charset="utf-8" />
+<title>ConsumePOP3</title>
+<link rel="stylesheet" href="../../css/component-usage.css"
+       type="text/css" />
+</head>
+
+<body>
+       <!-- Processor Documentation 
================================================== -->
+       <h2>Description:</h2>
+       <p>This Processor consumes email messages via POP3 protocol and sends 
the content of an email message as content of the Flow File. 
+          Content of the incoming email message is written as raw bytes to the 
content of the outgoing Flow File.
+    </p>
+
+       <p>Since different serves may require different Java Mail
+               properties such properties could be provided via dynamic 
properties.
+               For example, below is a sample configuration for GMail:
+       </p>
+       <p>
+               <b>Processor's static properties:</b>
+               <ul>
+                       <li><b>Host Name</b> - pop.gmail.com</li>
+                       <li><b>Port</b> - 995</li>
+                       <li><b>User Name</b> - <i>[your user name]</i></li>
+                       <li><b>Password</b> - <i>[your password]</i></li>
+                       <li><b>Folder</b> - INBOX</li>
+               </ul>
+               <b>Processor's dynamic properties:</b>
+               <ul>
+                       <li><b>mail.pop3.socketFactory.class</b> - 
javax.net.ssl.SSLSocketFactory</li>
+                       <li><b>mail.pop3.socketFactory.fallback</b> - false</li>
+               </ul>
+       </p>
+       <p>
+       Another useful property is      <b>mail.debug</b> which allows Java 
Mail API to print protocol messages to the console helping you to both 
understand what's going on as well as debug issues.
+       </p>
+       <p>
+       For the full list of available Java Mail properties please refer to <a 
href="http://connector.sourceforge.net/doc-files/Properties.html";>here</a>
+       </p>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java
 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java
new file mode 100644
index 0000000..84a78f5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.email;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import javax.mail.Message;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.springframework.integration.mail.AbstractMailReceiver;
+import org.springframework.integration.mail.ImapMailReceiver;
+
+public class ConsumeEmailTest {
+
+    @Test
+    public void validateProtocol() {
+        AbstractEmailProcessor<? extends AbstractMailReceiver> consume = new 
ConsumeIMAP();
+        TestRunner runner = TestRunners.newTestRunner(consume);
+        runner.setProperty(ConsumeIMAP.USE_SSL, "false");
+
+        assertEquals("imap", consume.getProtocol(runner.getProcessContext()));
+
+        runner = TestRunners.newTestRunner(consume);
+        runner.setProperty(ConsumeIMAP.USE_SSL, "true");
+
+        assertEquals("imaps", consume.getProtocol(runner.getProcessContext()));
+
+        consume = new ConsumePOP3();
+
+        assertEquals("pop3", consume.getProtocol(runner.getProcessContext()));
+    }
+
+    @Test
+    public void validateUrl() throws Exception {
+        Field displayUrlField = 
AbstractEmailProcessor.class.getDeclaredField("displayUrl");
+        displayUrlField.setAccessible(true);
+
+        AbstractEmailProcessor<? extends AbstractMailReceiver> consume = new 
ConsumeIMAP();
+        TestRunner runner = TestRunners.newTestRunner(consume);
+        runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com");
+        runner.setProperty(ConsumeIMAP.PORT, "1234");
+        runner.setProperty(ConsumeIMAP.USER, "jon");
+        runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr");
+        runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX");
+        runner.setProperty(ConsumeIMAP.USE_SSL, "false");
+
+        assertEquals("imap://jon:qhgwjg...@foo.bar.com:1234/MYBOX", 
consume.buildUrl(runner.getProcessContext()));
+        assertEquals("imap://jon:[password]@foo.bar.com:1234/MYBOX", 
displayUrlField.get(consume));
+    }
+
+    @Test
+    public void validateConsumeIMAP() throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(new 
TestImapProcessor(0));
+        runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com");
+        runner.setProperty(ConsumeIMAP.PORT, "1234");
+        runner.setProperty(ConsumeIMAP.USER, "jon");
+        runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr");
+        runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX");
+        runner.setProperty(ConsumeIMAP.USE_SSL, "false");
+        runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false");
+
+        runner.run();
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS);
+        assertTrue(flowFiles.isEmpty());
+
+        runner = TestRunners.newTestRunner(new TestImapProcessor(2));
+        runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com");
+        runner.setProperty(ConsumeIMAP.PORT, "1234");
+        runner.setProperty(ConsumeIMAP.USER, "jon");
+        runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr");
+        runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX");
+        runner.setProperty(ConsumeIMAP.USE_SSL, "false");
+        runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false");
+
+        runner.run(2);
+        flowFiles = 
runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS);
+        assertTrue(flowFiles.size() == 2);
+        MockFlowFile ff = flowFiles.get(0);
+        ff.assertContentEquals("You've Got Mail - 
0".getBytes(StandardCharsets.UTF_8));
+        ff = flowFiles.get(1);
+        ff.assertContentEquals("You've Got Mail - 
1".getBytes(StandardCharsets.UTF_8));
+    }
+
+    public static class TestImapProcessor extends ConsumeIMAP {
+        private final int messagesToGenerate;
+        TestImapProcessor(int messagesToGenerate) {
+            this.messagesToGenerate = messagesToGenerate;
+        }
+        @Override
+        protected ImapMailReceiver buildMessageReceiver(ProcessContext 
processContext) {
+            ImapMailReceiver receiver = mock(ImapMailReceiver.class);
+            try {
+                Message[] messages = new Message[this.messagesToGenerate];
+                for (int i = 0; i < this.messagesToGenerate; i++) {
+                    Message message = mock(Message.class);
+                    when(message.getInputStream()).thenReturn(
+                            new ByteArrayInputStream(("You've Got Mail - " + 
i).getBytes(StandardCharsets.UTF_8)));
+                    messages[i] = message;
+                }
+                when(receiver.receive()).thenReturn(messages);
+            } catch (Exception e) {
+                e.printStackTrace();
+                fail();
+            }
+            return receiver;
+        }
+    }
+}

Reply via email to