[GitHub] nifi pull request #856: NIFI-2519 aligned processor to leverage the subethas...

2016-08-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/856


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #856: NIFI-2519 aligned processor to leverage the subethas...

2016-08-14 Thread trixpan
Github user trixpan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/856#discussion_r74700141
  
--- Diff: 
nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/smtp/SmtpConsumer.java
 ---
@@ -0,0 +1,161 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.nifi.processors.email.smtp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.cert.Certificate;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processors.email.ListenSMTP;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.util.StopWatch;
+
+import org.subethamail.smtp.MessageContext;
+import org.subethamail.smtp.MessageHandler;
+import org.subethamail.smtp.RejectException;
+import org.subethamail.smtp.TooMuchDataException;
+import org.subethamail.smtp.server.SMTPServer;
+
+/**
+ * A simple consumer that provides a bridge between 'push' message 
distribution
+ * provided by {@link SMTPServer} and NiFi polling scheduler mechanism.
+ */
+public class SmtpConsumer implements MessageHandler {
+
+private String from = null;
+private final List recipientList = new ArrayList<>();
+private final MessageContext context;
+private final ProcessSessionFactory sessionFactory;
+private final int port;
+private final int maxMessageSize;
+private final ComponentLog log;
+private final String host;
+
+public SmtpConsumer(
+final MessageContext context,
+final ProcessSessionFactory sessionFactory,
+final int port,
+final String host,
+final ComponentLog log,
+final int maxMessageSize
+) {
+this.context = context;
+this.sessionFactory = sessionFactory;
+this.port = port;
+if (host == null || host.trim().isEmpty()) {
+this.host = context.getSMTPServer().getHostName();
+} else {
+this.host = host;
+}
+this.log = log;
+this.maxMessageSize = maxMessageSize;
+}
+
+String getFrom() {
+return from;
+}
+
+List getRecipients() {
+return Collections.unmodifiableList(recipientList);
+}
+
+@Override
+public void data(final InputStream data) throws RejectException, 
TooMuchDataException, IOException {
+final ProcessSession processSession = 
sessionFactory.createSession();
+final StopWatch watch = new StopWatch();
+watch.start();
+try {
+FlowFile flowFile = processSession.create();
+final AtomicBoolean limitExceeded = new AtomicBoolean(false);
+flowFile = processSession.write(flowFile, (OutputStream out) 
-> {
+final LimitingInputStream lis = new 
LimitingInputStream(data, maxMessageSize);
+IOUtils.copy(lis, out);
+if (lis.hasReachedLimit()) {
+limitExceeded.set(true);
+}
+});
+if (limitExceeded.get()) {
+throw new TooMuchDataException("Maximum message size limit 
reached - client 

[GitHub] nifi pull request #856: NIFI-2519 aligned processor to leverage the subethas...

2016-08-13 Thread joewitt
GitHub user joewitt opened a pull request:

https://github.com/apache/nifi/pull/856

NIFI-2519 aligned processor to leverage the subethasmtp threading model 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joewitt/incubator-nifi NIFI-2519

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/856.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #856


commit c4d750993b46e72506d7e19a5c5591ad1dc3f3b9
Author: Oleg Zhurakousky 
Date:   2016-08-10T14:19:17Z

NIFI-2519 Fixed and refactored ListenSMTP processor
- Removed message queueing which could result in data loss
- Fixed life-cycle issues that coudl put processor in an unstable state
- Fixed PropertyDescriptor translation for Time units and Byte sizes
- Fixed broken tests
- Added additional tests

NIFI-2519 added default for SMTP_MAXIMUM_CONNECTIONS

NIFI-2519 addressed PR comments, polishing
- fixed intermittent deadlock on processor stop and added test for it
- the attributes that can not be extracted from the message but available 
via MessageContext are written into the outgoing FlowFile
- other minor fixes

NIFI-2519 addressed lates PR comments

NIFI-2519 added better messaging when server closes the connection

NIFI-2519 some polishing and additional tests to validate deadlocks

NIFI-2519 address latest PR comments
fixed deadlock condition for when the consumer is stopped while server is 
distributing messages
fixed MAX message size issue ensuring it is validated
set max connections to SMTPServer
polished pom
added L

NIFI-2519 PR comments
- fixed LICENSE
- Added usage of LimitingInputStream
- simplified SmtpConsumer by removing hasMessage operation

commit b7e65001471e83103ebfdfc85b4bdf0fa90820fa
Author: joewitt 
Date:   2016-08-14T05:10:47Z

NIFI-2519 aligned threading model with subethastmp




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---