Github user trixpan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/856#discussion_r74700141
--- Diff:
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java
---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.email.smtp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.cert.Certificate;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processors.email.ListenSMTP;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.util.StopWatch;
+
+import org.subethamail.smtp.MessageContext;
+import org.subethamail.smtp.MessageHandler;
+import org.subethamail.smtp.RejectException;
+import org.subethamail.smtp.TooMuchDataException;
+import org.subethamail.smtp.server.SMTPServer;
+
+/**
+ * A simple consumer that provides a bridge between 'push' message
distribution
+ * provided by {@link SMTPServer} and NiFi polling scheduler mechanism.
+ */
+public class SmtpConsumer implements MessageHandler {
+
+private String from = null;
+private final List recipientList = new ArrayList<>();
+private final MessageContext context;
+private final ProcessSessionFactory sessionFactory;
+private final int port;
+private final int maxMessageSize;
+private final ComponentLog log;
+private final String host;
+
+public SmtpConsumer(
+final MessageContext context,
+final ProcessSessionFactory sessionFactory,
+final int port,
+final String host,
+final ComponentLog log,
+final int maxMessageSize
+) {
+this.context = context;
+this.sessionFactory = sessionFactory;
+this.port = port;
+if (host == null || host.trim().isEmpty()) {
+this.host = context.getSMTPServer().getHostName();
+} else {
+this.host = host;
+}
+this.log = log;
+this.maxMessageSize = maxMessageSize;
+}
+
+String getFrom() {
+return from;
+}
+
+List getRecipients() {
+return Collections.unmodifiableList(recipientList);
+}
+
+@Override
+public void data(final InputStream data) throws RejectException,
TooMuchDataException, IOException {
+final ProcessSession processSession =
sessionFactory.createSession();
+final StopWatch watch = new StopWatch();
+watch.start();
+try {
+FlowFile flowFile = processSession.create();
+final AtomicBoolean limitExceeded = new AtomicBoolean(false);
+flowFile = processSession.write(flowFile, (OutputStream out)
-> {
+final LimitingInputStream lis = new
LimitingInputStream(data, maxMessageSize);
+IOUtils.copy(lis, out);
+if (lis.hasReachedLimit()) {
+limitExceeded.set(true);
+}
+});
+if (limitExceeded.get()) {
+throw new TooMuchDataException("Maximum message size limit
reached - client