Repository: nifi Updated Branches: refs/heads/master 120d2100a -> 9b647cd53
NIFI-1148 added IMAP/POP3 support added initial set of processors to support consumption of Email via IMAP/IMAPS and POP3 protocols Signed-off-by: Matt Burgess <mattyb...@apache.org> NIFI-1148 addressed PR comments from @trixpan NIFI-1148 addressing PR comments NIFI-1148 addressed PR comments This closes #710 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9b647cd5 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9b647cd5 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9b647cd5 Branch: refs/heads/master Commit: 9b647cd53842e442e294dc62033c8e6df6078b81 Parents: 120d210 Author: Oleg Zhurakousky <o...@suitcase.io> Authored: Sat Jul 23 10:42:40 2016 -0400 Committer: Matt Burgess <mattyb...@apache.org> Committed: Mon Aug 8 11:57:28 2016 -0400 ---------------------------------------------------------------------- .../nifi-email-processors/pom.xml | 145 ++++--- .../email/AbstractEmailProcessor.java | 407 +++++++++++++++++++ .../nifi/processors/email/ConsumeIMAP.java | 91 +++++ .../nifi/processors/email/ConsumePOP3.java | 68 ++++ .../org.apache.nifi.processor.Processor | 2 + .../additionalDetails.html | 58 +++ .../additionalDetails.html | 57 +++ .../nifi/processors/email/ConsumeEmailTest.java | 135 ++++++ 8 files changed, 896 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml index 655ac86..75cd3d2 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml @@ -1,73 +1,84 @@ <?xml version="1.0" encoding="UTF-8"?> -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - You under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-email-bundle</artifactId> - <version>1.0.0-SNAPSHOT</version> - </parent> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-email-bundle</artifactId> + <version>1.0.0-SNAPSHOT</version> + </parent> - <artifactId>nifi-email-processors</artifactId> - <packaging>jar</packaging> + <artifactId>nifi-email-processors</artifactId> + <packaging>jar</packaging> - <dependencies> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-processor-utils</artifactId> - </dependency> - <dependency> - <groupId>javax.mail</groupId> - <artifactId>mail</artifactId> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-email</artifactId> - <version>1.4</version> - </dependency> - <dependency> - <groupId>org.subethamail</groupId> - <artifactId>subethasmtp</artifactId> - <version>3.1.7</version> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-ssl-context-service-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-ssl-context-service</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-mock</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <configuration> - </configuration> - </plugin> - </plugins> - </build> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>javax.mail</groupId> + <artifactId>mail</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-email</artifactId> + <version>1.4</version> + </dependency> + <dependency> + <groupId>org.subethamail</groupId> + <artifactId>subethasmtp</artifactId> + <version>3.1.7</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.integration</groupId> + <artifactId>spring-integration-mail</artifactId> + <version>4.3.0.RELEASE</version> + </dependency> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + <version>1.2</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + </configuration> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java new file mode 100644 index 0000000..e7fd27c --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.email; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.mail.Address; +import javax.mail.Flags; +import javax.mail.Message; +import javax.mail.MessagingException; + +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.support.StaticListableBeanFactory; +import org.springframework.integration.mail.AbstractMailReceiver; +import org.springframework.util.Assert; +import org.springframework.util.StreamUtils; + +/** + * Base processor for implementing processors to consume messages from Email + * servers using Spring Integration libraries. + * + * @param <T> + * the type of {@link AbstractMailReceiver}. + */ +abstract class AbstractEmailProcessor<T extends AbstractMailReceiver> extends AbstractProcessor { + + public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder() + .name("host") + .displayName("Host Name") + .description("Network address of Email server (e.g., pop.gmail.com, imap.gmail.com . . .)") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("port") + .displayName("Port") + .description("Numeric value identifying Port of Email server (e.g., 993)") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + public static final PropertyDescriptor USER = new PropertyDescriptor.Builder() + .name("user") + .displayName("User Name") + .description("User Name used for authentication and authorization with Email server.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("password") + .displayName("Password") + .description("Password used for authentication and authorization with Email server.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor FOLDER = new PropertyDescriptor.Builder() + .name("folder") + .displayName("Folder") + .description("Email folder to retrieve messages from (e.g., INBOX)") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("INBOX") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() + .name("fetch.size") + .displayName("Fetch Size") + .description("Specify the maximum number of Messages to fetch per call to Email Server.") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("10") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor SHOULD_DELETE_MESSAGES = new PropertyDescriptor.Builder() + .name("delete.messages") + .displayName("Delete Messages") + .description("Specify whether mail messages should be deleted after retrieval.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All messages that are the are successfully received from Email server and converted to FlowFiles are routed to this relationship") + .build(); + + static List<PropertyDescriptor> SHARED_DESCRIPTORS = new ArrayList<>(); + + static Set<Relationship> SHARED_RELATIONSHIPS = new HashSet<>(); + + /* + * Will ensure that list of PropertyDescriptors is build only once, since + * all other lifecycle methods are invoked multiple times. + */ + static { + SHARED_DESCRIPTORS.add(HOST); + SHARED_DESCRIPTORS.add(PORT); + SHARED_DESCRIPTORS.add(USER); + SHARED_DESCRIPTORS.add(PASSWORD); + SHARED_DESCRIPTORS.add(FOLDER); + SHARED_DESCRIPTORS.add(FETCH_SIZE); + SHARED_DESCRIPTORS.add(SHOULD_DELETE_MESSAGES); + + SHARED_RELATIONSHIPS.add(REL_SUCCESS); + } + + protected final Logger logger = LoggerFactory.getLogger(this.getClass()); + + protected volatile T messageReceiver; + + private volatile BlockingQueue<Message> messageQueue; + + private volatile String displayUrl; + + private volatile ProcessSession processSession; + + private volatile boolean shouldSetDeleteFlag; + + @OnStopped + public void stop(ProcessContext processContext) { + this.flushRemainingMessages(processContext); + try { + this.messageReceiver.destroy(); + this.messageReceiver = null; + } catch (Exception e) { + this.logger.warn("Failure while closing processor", e); + } + } + + /** + * + */ + @Override + public Set<Relationship> getRelationships() { + return SHARED_RELATIONSHIPS; + } + + /** + * + */ + @Override + public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException { + this.initializeIfNecessary(context, processSession); + + Message emailMessage = this.receiveMessage(); + if (emailMessage != null) { + this.transfer(emailMessage, context, processSession); + } + } + + /** + * + */ + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + "' Java Mail property.") + .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) + .build(); + } + + /** + * Delegates to sub-classes to build the target receiver as + * {@link AbstractMailReceiver} + * + * @param context + * instance of {@link ProcessContext} + * @return new instance of {@link AbstractMailReceiver} + */ + protected abstract T buildMessageReceiver(ProcessContext context); + + /** + * Return the target receivere's mail protocol (e.g., imap, pop etc.) + */ + protected abstract String getProtocol(ProcessContext processContext); + + /** + * Builds the url used to connect to the email server. + */ + String buildUrl(ProcessContext processContext) { + String host = processContext.getProperty(HOST).evaluateAttributeExpressions().getValue(); + String port = processContext.getProperty(PORT).evaluateAttributeExpressions().getValue(); + String user = processContext.getProperty(USER).evaluateAttributeExpressions().getValue(); + String password = processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); + String folder = processContext.getProperty(FOLDER).evaluateAttributeExpressions().getValue(); + + StringBuilder urlBuilder = new StringBuilder(); + try { + urlBuilder.append(URLEncoder.encode(user, "UTF-8")); + } catch (UnsupportedEncodingException e) { + throw new ProcessException(e); + } + urlBuilder.append(":"); + try { + urlBuilder.append(URLEncoder.encode(password, "UTF-8")); + } catch (UnsupportedEncodingException e) { + throw new ProcessException(e); + } + urlBuilder.append("@"); + urlBuilder.append(host); + urlBuilder.append(":"); + urlBuilder.append(port); + urlBuilder.append("/"); + urlBuilder.append(folder); + + String protocol = this.getProtocol(processContext); + String finalUrl = protocol + "://" + urlBuilder.toString(); + + // build display-safe URL + int passwordStartIndex = urlBuilder.indexOf(":") + 1; + int passwordEndIndex = urlBuilder.indexOf("@"); + urlBuilder.replace(passwordStartIndex, passwordEndIndex, "[password]"); + this.displayUrl = protocol + "://" + urlBuilder.toString(); + if (this.logger.isInfoEnabled()) { + this.logger.info("Connecting to Email server at the following URL: " + this.displayUrl); + } + + return finalUrl; + } + + /** + * Builds and initializes the target message receiver if necessary (if it's + * null). Upon execution of this operation the receiver is fully functional + * and is ready to receive messages. + */ + private synchronized void initializeIfNecessary(ProcessContext context, ProcessSession processSession) { + if (this.messageReceiver == null) { + this.processSession = processSession; + this.messageReceiver = this.buildMessageReceiver(context); + + this.shouldSetDeleteFlag = context.getProperty(SHOULD_DELETE_MESSAGES).asBoolean(); + int fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger(); + + this.messageReceiver.setMaxFetchSize(fetchSize); + this.messageReceiver.setJavaMailProperties(this.buildJavaMailProperties(context)); + // need to avoid spring warning messages + this.messageReceiver.setBeanFactory(new StaticListableBeanFactory()); + this.messageReceiver.afterPropertiesSet(); + + this.messageQueue = new ArrayBlockingQueue<>(fetchSize); + } + } + + /** + * Extracts dynamic properties which typically represent the Java Mail + * properties from the {@link ProcessContext} returnining them as instance + * of {@link Properties} + */ + private Properties buildJavaMailProperties(ProcessContext context) { + Properties javaMailProperties = new Properties(); + for (Entry<PropertyDescriptor, String> propertyDescriptorEntry : context.getProperties().entrySet()) { + if (propertyDescriptorEntry.getKey().isDynamic()) { + javaMailProperties.setProperty(propertyDescriptorEntry.getKey().getName(), + propertyDescriptorEntry.getValue()); + } + } + return javaMailProperties; + } + + /** + * Fills the internal message queue if such queue is empty. This is due to + * the fact that per single session there may be multiple messages retrieved + * from the email server (see FETCH_SIZE). + */ + private synchronized void fillMessageQueueIfNecessary() { + if (this.messageQueue.isEmpty()) { + Object[] messages; + try { + messages = this.messageReceiver.receive(); + } catch (MessagingException e) { + String errorMsg = "Failed to receive messages from Email server: [" + e.getClass().getName() + + " - " + e.getMessage(); + this.getLogger().error(errorMsg); + throw new ProcessException(errorMsg, e); + } + + if (messages != null) { + for (Object message : messages) { + Assert.isTrue(message instanceof Message, "Message is not an instance of javax.mail.Message"); + this.messageQueue.offer((Message) message); + } + } + } + } + + /** + * Disposes the message by converting it to a {@link FlowFile} transferring + * it to the REL_SUCCESS relationship. + */ + private void transfer(Message emailMessage, ProcessContext context, ProcessSession processSession) { + long start = System.nanoTime(); + FlowFile flowFile = processSession.create(); + + flowFile = processSession.append(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { + StreamUtils.copy(emailMessage.getInputStream(), out); + } catch (MessagingException e) { + throw new IOException(e); + } + } + }); + + long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + + String fromAddressesString = ""; + try { + Address[] fromAddresses = emailMessage.getFrom(); + if (fromAddresses != null) { + fromAddressesString = Arrays.asList(fromAddresses).toString(); + } + } catch (MessagingException e) { + this.logger.warn("Faild to retrieve 'From' attribute from Message."); + } + + processSession.getProvenanceReporter().receive(flowFile, this.displayUrl, "Received message from " + fromAddressesString, executionDuration); + this.getLogger().info("Successfully received {} from {} in {} millis", new Object[] { flowFile, fromAddressesString, executionDuration }); + processSession.transfer(flowFile, REL_SUCCESS); + + try { + emailMessage.setFlag(Flags.Flag.DELETED, this.shouldSetDeleteFlag); + } catch (MessagingException e) { + this.logger.warn("Failed to set DELETE Flag on the message", e); + this.getLogger().warn("Failed to set DELETE Flag on the message"); + } + } + + /** + * Receives message from the internal queue filling up the queue if + * necessary. + */ + private Message receiveMessage() { + Message emailMessage = null; + try { + this.fillMessageQueueIfNecessary(); + emailMessage = this.messageQueue.poll(1, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + this.logger.debug("Current thread is interrupted"); + } + return emailMessage; + } + + /** + * Will flush the remaining messages when this processor is stopped. The + * flushed messages are disposed via + * {@link #disposeMessage(Message, ProcessContext, ProcessSession)} + * operation + */ + private void flushRemainingMessages(ProcessContext processContext) { + Message emailMessage; + try { + while ((emailMessage = this.messageQueue.poll(1, TimeUnit.MILLISECONDS)) != null) { + this.transfer(emailMessage, processContext, this.processSession); + this.processSession.commit(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + this.logger.debug("Current thread is interrupted"); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumeIMAP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumeIMAP.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumeIMAP.java new file mode 100644 index 0000000..62e6f5d --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumeIMAP.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.email; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.springframework.integration.mail.ImapMailReceiver; + +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("Consumes messages from Email Server using IMAP protocol. " + + "The raw-bytes of each received email message are written as contents of the FlowFile") +@Tags({ "Email", "Imap", "Get", "Ingest", "Ingress", "Message", "Consume" }) +public class ConsumeIMAP extends AbstractEmailProcessor<ImapMailReceiver> { + + public static final PropertyDescriptor SHOULD_MARK_READ = new PropertyDescriptor.Builder() + .name("Mark Messages as Read") + .description("Specify if messages should be marked as read after retrieval.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor USE_SSL = new PropertyDescriptor.Builder() + .name("Use SSL") + .description("Specifies if IMAP connection must be obtained via SSL encrypted connection (i.e., IMAPS)") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + static final List<PropertyDescriptor> DESCRIPTORS; + + static { + List<PropertyDescriptor> _descriptors = new ArrayList<>(); + _descriptors.addAll(SHARED_DESCRIPTORS); + _descriptors.add(SHOULD_MARK_READ); + _descriptors.add(USE_SSL); + DESCRIPTORS = Collections.unmodifiableList(_descriptors); + } + + /** + * + */ + @Override + protected ImapMailReceiver buildMessageReceiver(ProcessContext processContext) { + ImapMailReceiver receiver = new ImapMailReceiver(this.buildUrl(processContext)); + boolean shouldMarkAsRead = processContext.getProperty(SHOULD_MARK_READ).asBoolean(); + receiver.setShouldMarkMessagesAsRead(shouldMarkAsRead); + return receiver; + } + + /** + * + */ + @Override + protected String getProtocol(ProcessContext processContext) { + return processContext.getProperty(USE_SSL).asBoolean() ? "imaps" : "imap"; + } + + /** + * + */ + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumePOP3.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumePOP3.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumePOP3.java new file mode 100644 index 0000000..6deab39 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/ConsumePOP3.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.email; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.springframework.integration.mail.Pop3MailReceiver; + +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("Consumes messages from Email Server using POP3 protocol. " + + "The raw-bytes of each received email message are written as contents of the FlowFile") +@Tags({ "Email", "POP3", "Get", "Ingest", "Ingress", "Message", "Consume" }) +public class ConsumePOP3 extends AbstractEmailProcessor<Pop3MailReceiver> { + + static final List<PropertyDescriptor> DESCRIPTORS; + + static { + List<PropertyDescriptor> _descriptors = new ArrayList<>(); + _descriptors.addAll(SHARED_DESCRIPTORS); + DESCRIPTORS = Collections.unmodifiableList(_descriptors); + } + + /** + * + */ + @Override + protected String getProtocol(ProcessContext processContext) { + return "pop3"; + } + + /** + * + */ + @Override + protected Pop3MailReceiver buildMessageReceiver(ProcessContext context) { + return new Pop3MailReceiver(this.buildUrl(context)); + } + + /** + * + */ + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 7a1f644..c6bf6d8 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -15,3 +15,5 @@ org.apache.nifi.processors.email.ExtractEmailAttachments org.apache.nifi.processors.email.ExtractEmailHeaders org.apache.nifi.processors.email.ListenSMTP +org.apache.nifi.processors.email.ConsumeIMAP +org.apache.nifi.processors.email.ConsumePOP3 http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumeIMAP/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumeIMAP/additionalDetails.html b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumeIMAP/additionalDetails.html new file mode 100644 index 0000000..3be9a84 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumeIMAP/additionalDetails.html @@ -0,0 +1,58 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<head> +<meta charset="utf-8" /> +<title>ConsumeIMAP</title> +<link rel="stylesheet" href="../../css/component-usage.css" + type="text/css" /> +</head> + +<body> + <!-- Processor Documentation ================================================== --> + <h2>Description:</h2> + <p>This Processor consumes email messages via IMAP protocol and sends the content of an email message as content of the Flow File. + Content of the incoming email message is written as raw bytes to the content of the outgoing Flow File. + </p> + + <p>Different email providers may require additional Java Mail properties which could be provided as dynamic properties. + For example, below is a sample configuration for GMail: + + </p> + <p> + <b>Processor's static properties:</b> + <ul> + <li><b>Host Name</b> - imap.gmail.com</li> + <li><b>Port</b> - 993</li> + <li><b>User Name</b> - <i>[your user name]</i></li> + <li><b>Password</b> - <i>[your password]</i></li> + <li><b>Folder</b> - INBOX</li> + </ul> + <b>Processor's dynamic properties:</b> + <ul> + <li><b>mail.imap.socketFactory.class</b> - javax.net.ssl.SSLSocketFactory</li> + <li><b>mail.imap.socketFactory.fallback</b> - false</li> + <li><b>mail.store.protocol</b> - imaps</li> + </ul> + </p> + <p> + Another useful property is <b>mail.debug</b> which allows Java Mail API to print protocol messages to the console helping you to both understand what's going on as well as debug issues. + </p> + <p> + For the full list of available Java Mail properties please refer to <a href="http://connector.sourceforge.net/doc-files/Properties.html">here</a> + </p> +</body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumePOP3/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumePOP3/additionalDetails.html b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumePOP3/additionalDetails.html new file mode 100644 index 0000000..55a2220 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/resources/docs/org.apache.nifi.processors.email.ConsumePOP3/additionalDetails.html @@ -0,0 +1,57 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<head> +<meta charset="utf-8" /> +<title>ConsumePOP3</title> +<link rel="stylesheet" href="../../css/component-usage.css" + type="text/css" /> +</head> + +<body> + <!-- Processor Documentation ================================================== --> + <h2>Description:</h2> + <p>This Processor consumes email messages via POP3 protocol and sends the content of an email message as content of the Flow File. + Content of the incoming email message is written as raw bytes to the content of the outgoing Flow File. + </p> + + <p>Since different serves may require different Java Mail + properties such properties could be provided via dynamic properties. + For example, below is a sample configuration for GMail: + </p> + <p> + <b>Processor's static properties:</b> + <ul> + <li><b>Host Name</b> - pop.gmail.com</li> + <li><b>Port</b> - 995</li> + <li><b>User Name</b> - <i>[your user name]</i></li> + <li><b>Password</b> - <i>[your password]</i></li> + <li><b>Folder</b> - INBOX</li> + </ul> + <b>Processor's dynamic properties:</b> + <ul> + <li><b>mail.pop3.socketFactory.class</b> - javax.net.ssl.SSLSocketFactory</li> + <li><b>mail.pop3.socketFactory.fallback</b> - false</li> + </ul> + </p> + <p> + Another useful property is <b>mail.debug</b> which allows Java Mail API to print protocol messages to the console helping you to both understand what's going on as well as debug issues. + </p> + <p> + For the full list of available Java Mail properties please refer to <a href="http://connector.sourceforge.net/doc-files/Properties.html">here</a> + </p> +</body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/9b647cd5/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java new file mode 100644 index 0000000..84a78f5 --- /dev/null +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/test/java/org/apache/nifi/processors/email/ConsumeEmailTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.email; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import javax.mail.Message; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import org.springframework.integration.mail.AbstractMailReceiver; +import org.springframework.integration.mail.ImapMailReceiver; + +public class ConsumeEmailTest { + + @Test + public void validateProtocol() { + AbstractEmailProcessor<? extends AbstractMailReceiver> consume = new ConsumeIMAP(); + TestRunner runner = TestRunners.newTestRunner(consume); + runner.setProperty(ConsumeIMAP.USE_SSL, "false"); + + assertEquals("imap", consume.getProtocol(runner.getProcessContext())); + + runner = TestRunners.newTestRunner(consume); + runner.setProperty(ConsumeIMAP.USE_SSL, "true"); + + assertEquals("imaps", consume.getProtocol(runner.getProcessContext())); + + consume = new ConsumePOP3(); + + assertEquals("pop3", consume.getProtocol(runner.getProcessContext())); + } + + @Test + public void validateUrl() throws Exception { + Field displayUrlField = AbstractEmailProcessor.class.getDeclaredField("displayUrl"); + displayUrlField.setAccessible(true); + + AbstractEmailProcessor<? extends AbstractMailReceiver> consume = new ConsumeIMAP(); + TestRunner runner = TestRunners.newTestRunner(consume); + runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com"); + runner.setProperty(ConsumeIMAP.PORT, "1234"); + runner.setProperty(ConsumeIMAP.USER, "jon"); + runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr"); + runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX"); + runner.setProperty(ConsumeIMAP.USE_SSL, "false"); + + assertEquals("imap://jon:qhgwjg...@foo.bar.com:1234/MYBOX", consume.buildUrl(runner.getProcessContext())); + assertEquals("imap://jon:[password]@foo.bar.com:1234/MYBOX", displayUrlField.get(consume)); + } + + @Test + public void validateConsumeIMAP() throws Exception { + TestRunner runner = TestRunners.newTestRunner(new TestImapProcessor(0)); + runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com"); + runner.setProperty(ConsumeIMAP.PORT, "1234"); + runner.setProperty(ConsumeIMAP.USER, "jon"); + runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr"); + runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX"); + runner.setProperty(ConsumeIMAP.USE_SSL, "false"); + runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false"); + + runner.run(); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS); + assertTrue(flowFiles.isEmpty()); + + runner = TestRunners.newTestRunner(new TestImapProcessor(2)); + runner.setProperty(ConsumeIMAP.HOST, "foo.bar.com"); + runner.setProperty(ConsumeIMAP.PORT, "1234"); + runner.setProperty(ConsumeIMAP.USER, "jon"); + runner.setProperty(ConsumeIMAP.PASSWORD, "qhgwjgehr"); + runner.setProperty(ConsumeIMAP.FOLDER, "MYBOX"); + runner.setProperty(ConsumeIMAP.USE_SSL, "false"); + runner.setProperty(ConsumeIMAP.SHOULD_DELETE_MESSAGES, "false"); + + runner.run(2); + flowFiles = runner.getFlowFilesForRelationship(ConsumeIMAP.REL_SUCCESS); + assertTrue(flowFiles.size() == 2); + MockFlowFile ff = flowFiles.get(0); + ff.assertContentEquals("You've Got Mail - 0".getBytes(StandardCharsets.UTF_8)); + ff = flowFiles.get(1); + ff.assertContentEquals("You've Got Mail - 1".getBytes(StandardCharsets.UTF_8)); + } + + public static class TestImapProcessor extends ConsumeIMAP { + private final int messagesToGenerate; + TestImapProcessor(int messagesToGenerate) { + this.messagesToGenerate = messagesToGenerate; + } + @Override + protected ImapMailReceiver buildMessageReceiver(ProcessContext processContext) { + ImapMailReceiver receiver = mock(ImapMailReceiver.class); + try { + Message[] messages = new Message[this.messagesToGenerate]; + for (int i = 0; i < this.messagesToGenerate; i++) { + Message message = mock(Message.class); + when(message.getInputStream()).thenReturn( + new ByteArrayInputStream(("You've Got Mail - " + i).getBytes(StandardCharsets.UTF_8))); + messages[i] = message; + } + when(receiver.receive()).thenReturn(messages); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + return receiver; + } + } +}