http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
new file mode 100644
index 0000000..77fbe97
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-aws-bundle</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-aws-nar</artifactId>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-aws-processors</artifactId>
+            <version>0.3.0-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..64d4975
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,74 @@
+nifi-aws-nar
+Copyright 2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+******************
+Apache Software License v2
+******************
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache HttpComponents
+    The following NOTICE information applies:
+      Apache HttpClient
+      Copyright 1999-2014 The Apache Software Foundation
+      
+      Apache HttpCore
+      Copyright 2005-2014 The Apache Software Foundation
+
+      This project contains annotations derived from JCIP-ANNOTATIONS
+      Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
+
+  (ASLv2) Joda Time
+    The following NOTICE information applies:
+      This product includes software developed by
+      Joda.org (http://www.joda.org/).
+
+  (ASLv2) Apache Commons Codec
+    The following NOTICE information applies:
+      Apache Commons Codec
+      Copyright 2002-2014 The Apache Software Foundation
+
+      src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+      contains test data from http://aspell.net/test/orig/batch0.tab.
+      Copyright (C) 2002 Kevin Atkinson (kev...@gnu.org)
+
+      
===============================================================================
+
+      The content of package org.apache.commons.codec.language.bm has been 
translated
+      from the original php source code available at 
http://stevemorse.org/phoneticinfo.htm
+      with permission from the original authors.
+      Original source copyright:
+      Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
+  (ASLv2) Apache Commons Logging
+    The following NOTICE information applies:
+      Apache Commons Logging
+      Copyright 2003-2013 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2014 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+  (ASLv2) Amazon Web Services SDK
+    The following NOTICE information applies:
+      Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights 
Reserved.
+
+      This product includes software developed by
+      Amazon Technologies, Inc (http://www.amazon.com/).
+
+      **********************
+      THIRD PARTY COMPONENTS
+      **********************
+      This software includes third party software subject to the following 
copyrights:
+      - XML parsing and utility functions from JetS3t - Copyright 2006-2009 
James Murty.
+      - JSON parsing and utility functions from JSON.org - Copyright 2002 
JSON.org.
+      - PKCS#1 PEM encoded private key parsing and utility functions from 
oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
+
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
new file mode 100644
index 0000000..b784524
--- /dev/null
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-aws-bundle</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-aws-processors</artifactId>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/hello.txt</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
new file mode 100644
index 0000000..a781ff9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AnonymousAWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.PropertiesCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+
+public abstract class AbstractAWSProcessor<ClientType extends 
AmazonWebServiceClient> extends AbstractProcessor {
+
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("FlowFiles are routed to success after being 
successfully copied to Amazon S3").build();
+    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("FlowFiles are routed to failure if unable to be 
copied to Amazon S3").build();
+
+    public static final Set<Relationship> relationships = 
Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    public static final PropertyDescriptor CREDENTAILS_FILE = new 
PropertyDescriptor.Builder()
+            .name("Credentials File")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor ACCESS_KEY = new 
PropertyDescriptor.Builder()
+            .name("Access Key")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+    public static final PropertyDescriptor SECRET_KEY = new 
PropertyDescriptor.Builder()
+            .name("Secret Key")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+    public static final PropertyDescriptor REGION = new 
PropertyDescriptor.Builder()
+            .name("Region")
+            .required(true)
+            .allowableValues(getAvailableRegions())
+            
.defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
+            .build();
+
+    public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Communications Timeout")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("30 secs")
+            .build();
+
+    private volatile ClientType client;
+
+    private static AllowableValue createAllowableValue(final Regions regions) {
+        return new AllowableValue(regions.getName(), regions.getName(), 
regions.getName());
+    }
+
+    private static AllowableValue[] getAvailableRegions() {
+        final List<AllowableValue> values = new ArrayList<>();
+        for (final Regions regions : Regions.values()) {
+            values.add(createAllowableValue(regions));
+        }
+
+        return (AllowableValue[]) values.toArray(new 
AllowableValue[values.size()]);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(validationContext));
+
+        final boolean accessKeySet = 
validationContext.getProperty(ACCESS_KEY).isSet();
+        final boolean secretKeySet = 
validationContext.getProperty(SECRET_KEY).isSet();
+        if ((accessKeySet && !secretKeySet) || (secretKeySet && 
!accessKeySet)) {
+            problems.add(new ValidationResult.Builder().input("Access 
Key").valid(false).explanation("If setting Secret Key or Access Key, must set 
both").build());
+        }
+
+        final boolean credentialsFileSet = 
validationContext.getProperty(CREDENTAILS_FILE).isSet();
+        if ((secretKeySet || accessKeySet) && credentialsFileSet) {
+            problems.add(new ValidationResult.Builder().input("Access 
Key").valid(false).explanation("Cannot set both Credentials File and Secret 
Key/Access Key").build());
+        }
+
+        return problems;
+    }
+
+    protected ClientConfiguration createConfiguration(final ProcessContext 
context) {
+        final ClientConfiguration config = new ClientConfiguration();
+        config.setMaxConnections(context.getMaxConcurrentTasks());
+        config.setMaxErrorRetry(0);
+        config.setUserAgent("NiFi");
+
+        final int commsTimeout = 
context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        config.setConnectionTimeout(commsTimeout);
+        config.setSocketTimeout(commsTimeout);
+
+        return config;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final ClientType awsClient = createClient(context, 
getCredentials(context), createConfiguration(context));
+        this.client = awsClient;
+
+        // if the processor supports REGION, get the configured region.
+        if (getSupportedPropertyDescriptors().contains(REGION)) {
+            final String region = context.getProperty(REGION).getValue();
+            if (region != null) {
+                client.setRegion(Region.getRegion(Regions.fromName(region)));
+            }
+        }
+    }
+
+    protected abstract ClientType createClient(final ProcessContext context, 
final AWSCredentials credentials, final ClientConfiguration config);
+
+    protected ClientType getClient() {
+        return client;
+    }
+
+    protected AWSCredentials getCredentials(final ProcessContext context) {
+        final String accessKey = context.getProperty(ACCESS_KEY).getValue();
+        final String secretKey = context.getProperty(SECRET_KEY).getValue();
+
+        final String credentialsFile = 
context.getProperty(CREDENTAILS_FILE).getValue();
+
+        if (credentialsFile != null) {
+            try {
+                return new PropertiesCredentials(new File(credentialsFile));
+            } catch (final IOException ioe) {
+                throw new ProcessException("Could not read Credentials File", 
ioe);
+            }
+        }
+
+        if (accessKey != null && secretKey != null) {
+            return new BasicAWSCredentials(accessKey, secretKey);
+        }
+
+        return new AnonymousAWSCredentials();
+    }
+
+    protected boolean isEmpty(final String value) {
+        return value == null || value.trim().equals("");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
new file mode 100644
index 0000000..76880ef
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.CanonicalGrantee;
+import com.amazonaws.services.s3.model.EmailAddressGrantee;
+import com.amazonaws.services.s3.model.Grantee;
+import com.amazonaws.services.s3.model.Owner;
+import com.amazonaws.services.s3.model.Permission;
+
+public abstract class AbstractS3Processor extends 
AbstractAWSProcessor<AmazonS3Client> {
+
+    public static final PropertyDescriptor FULL_CONTROL_USER_LIST = new 
PropertyDescriptor.Builder()
+            .name("FullControl User List")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have Full Control for an object")
+            .defaultValue("${s3.permissions.full.users}")
+            .build();
+    public static final PropertyDescriptor READ_USER_LIST = new 
PropertyDescriptor.Builder()
+            .name("Read Permission User List")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have Read Access for an object")
+            .defaultValue("${s3.permissions.read.users}")
+            .build();
+    public static final PropertyDescriptor WRITE_USER_LIST = new 
PropertyDescriptor.Builder()
+            .name("Write Permission User List")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have Write Access for an object")
+            .defaultValue("${s3.permissions.write.users}")
+            .build();
+    public static final PropertyDescriptor READ_ACL_LIST = new 
PropertyDescriptor.Builder()
+            .name("Read ACL User List")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have permissions to read the Access Control 
List for an object")
+            .defaultValue("${s3.permissions.readacl.users}")
+            .build();
+    public static final PropertyDescriptor WRITE_ACL_LIST = new 
PropertyDescriptor.Builder()
+            .name("Write ACL User List")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have permissions to change the Access 
Control List for an object")
+            .defaultValue("${s3.permissions.writeacl.users}")
+            .build();
+    public static final PropertyDescriptor OWNER = new 
PropertyDescriptor.Builder()
+            .name("Owner")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The Amazon ID to use for the object's owner")
+            .defaultValue("${s3.owner}")
+            .build();
+    public static final PropertyDescriptor BUCKET = new 
PropertyDescriptor.Builder()
+            .name("Bucket")
+            .expressionLanguageSupported(true)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
+            .name("Object Key")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("${filename}")
+            .build();
+
+    @Override
+    protected AmazonS3Client createClient(final ProcessContext context, final 
AWSCredentials credentials, final ClientConfiguration config) {
+        return new AmazonS3Client(credentials, config);
+    }
+
+    protected Grantee createGrantee(final String value) {
+        if (isEmpty(value)) {
+            return null;
+        }
+
+        if (value.contains("@")) {
+            return new EmailAddressGrantee(value);
+        } else {
+            return new CanonicalGrantee(value);
+        }
+    }
+
+    protected final List<Grantee> createGrantees(final String value) {
+        if (isEmpty(value)) {
+            return Collections.emptyList();
+        }
+
+        final List<Grantee> grantees = new ArrayList<>();
+        final String[] vals = value.split(",");
+        for (final String val : vals) {
+            final String identifier = val.trim();
+            final Grantee grantee = createGrantee(identifier);
+            if (grantee != null) {
+                grantees.add(grantee);
+            }
+        }
+        return grantees;
+    }
+
+    protected final AccessControlList createACL(final ProcessContext context, 
final FlowFile flowFile) {
+        final AccessControlList acl = new AccessControlList();
+
+        final String ownerId = 
context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue();
+        if (!isEmpty(ownerId)) {
+            final Owner owner = new Owner();
+            owner.setId(ownerId);
+            acl.setOwner(owner);
+        }
+
+        for (final Grantee grantee : 
createGrantees(context.getProperty(FULL_CONTROL_USER_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
+            acl.grantPermission(grantee, Permission.FullControl);
+        }
+
+        for (final Grantee grantee : 
createGrantees(context.getProperty(READ_USER_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
+            acl.grantPermission(grantee, Permission.Read);
+        }
+
+        for (final Grantee grantee : 
createGrantees(context.getProperty(WRITE_USER_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
+            acl.grantPermission(grantee, Permission.Write);
+        }
+
+        for (final Grantee grantee : 
createGrantees(context.getProperty(READ_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
+            acl.grantPermission(grantee, Permission.ReadAcp);
+        }
+
+        for (final Grantee grantee : 
createGrantees(context.getProperty(WRITE_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
+            acl.grantPermission(grantee, Permission.WriteAcp);
+        }
+
+        return acl;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
new file mode 100644
index 0000000..2406b67
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+
+@SupportsBatching
+@SeeAlso({PutS3Object.class})
+@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
+@CapabilityDescription("Retrieves the contents of an S3 Object and writes it 
to the content of a FlowFile")
+@WritesAttributes({
+    @WritesAttribute(attribute = "s3.bucket", description = "The name of the 
S3 bucket"),
+    @WritesAttribute(attribute = "path", description = "The path of the file"),
+    @WritesAttribute(attribute = "absolute.path", description = "The path of 
the file"),
+    @WritesAttribute(attribute = "filename", description = "The name of the 
file"),
+    @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of 
the file"),
+    @WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
+    @WritesAttribute(attribute = "mime.type", description = "If S3 provides 
the content type/MIME type, this attribute will hold that file"),
+    @WritesAttribute(attribute = "s3.etag", description = "The ETag that can 
be used to see if the file has changed"),
+    @WritesAttribute(attribute = "s3.expirationTime", description = "If the 
file has an expiration date, this attribute will be set, containing the 
milliseconds since epoch in UTC time"),
+    @WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The 
ID of the rule that dictates this object's expiration time"),
+    @WritesAttribute(attribute = "s3.version", description = "The version of 
the S3 object"),})
+public class FetchS3Object extends AbstractS3Processor {
+
+    public static final PropertyDescriptor VERSION_ID = new 
PropertyDescriptor.Builder()
+            .name("Version")
+            .description("The Version of the Object to download")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(false)
+            .build();
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, 
CREDENTAILS_FILE, TIMEOUT, VERSION_ID));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String versionId = 
context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
+
+        final AmazonS3 client = getClient();
+        final GetObjectRequest request;
+        if (versionId == null) {
+            request = new GetObjectRequest(bucket, key);
+        } else {
+            request = new GetObjectRequest(bucket, key, versionId);
+        }
+
+        final Map<String, String> attributes = new HashMap<>();
+        try (final S3Object s3Object = client.getObject(request)) {
+            flowFile = session.importFrom(s3Object.getObjectContent(), 
flowFile);
+            attributes.put("s3.bucket", s3Object.getBucketName());
+
+            final ObjectMetadata metadata = s3Object.getObjectMetadata();
+            if (metadata.getContentDisposition() != null) {
+                final String fullyQualified = metadata.getContentDisposition();
+                final int lastSlash = fullyQualified.lastIndexOf("/");
+                if (lastSlash > -1 && lastSlash < fullyQualified.length() - 1) 
{
+                    attributes.put(CoreAttributes.PATH.key(), 
fullyQualified.substring(0, lastSlash));
+                    attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), 
fullyQualified);
+                    attributes.put(CoreAttributes.FILENAME.key(), 
fullyQualified.substring(lastSlash + 1));
+                } else {
+                    attributes.put(CoreAttributes.FILENAME.key(), 
metadata.getContentDisposition());
+                }
+            }
+            if (metadata.getContentMD5() != null) {
+                attributes.put("hash.value", metadata.getContentMD5());
+                attributes.put("hash.algorithm", "MD5");
+            }
+            if (metadata.getContentType() != null) {
+                attributes.put(CoreAttributes.MIME_TYPE.key(), 
metadata.getContentType());
+            }
+            if (metadata.getETag() != null) {
+                attributes.put("s3.etag", metadata.getETag());
+            }
+            if (metadata.getExpirationTime() != null) {
+                attributes.put("s3.expirationTime", 
String.valueOf(metadata.getExpirationTime().getTime()));
+            }
+            if (metadata.getExpirationTimeRuleId() != null) {
+                attributes.put("s3.expirationTimeRuleId", 
metadata.getExpirationTimeRuleId());
+            }
+            if (metadata.getUserMetadata() != null) {
+                attributes.putAll(metadata.getUserMetadata());
+            }
+            if (metadata.getVersionId() != null) {
+                attributes.put("s3.version", metadata.getVersionId());
+            }
+        } catch (final IOException | AmazonClientException ioe) {
+            getLogger().error("Failed to retrieve S3 Object for {}; routing to 
failure", new Object[]{flowFile, ioe});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        if (!attributes.isEmpty()) {
+            flowFile = session.putAllAttributes(flowFile, attributes);
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+        final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully retrieved S3 Object for {} in {} 
millis; routing to success", new Object[]{flowFile, transferMillis});
+        session.getProvenanceReporter().receive(flowFile, "http://"; + bucket + 
".amazonaws.com/" + key, transferMillis);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
new file mode 100644
index 0000000..24c82dd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.StorageClass;
+
+@SupportsBatching
+@SeeAlso({FetchS3Object.class})
+@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
+@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
+@DynamicProperty(name = "The name of a User-Defined Metadata field to add to 
the S3 Object",
+        value = "The value of a User-Defined Metadata field to add to the S3 
Object",
+        description = "Allows user-defined metadata to be added to the S3 
object as key/value pairs",
+        supportsExpressionLanguage = true)
+@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's 
filename as the filename for the S3 object")
+@WritesAttributes({
+    @WritesAttribute(attribute = "s3.version", description = "The version of 
the S3 Object that was put to S3"),
+    @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 
Object"),
+    @WritesAttribute(attribute = "s3.expiration", description = "A 
human-readable form of the expiration date of the S3 object, if one is set")
+})
+public class PutS3Object extends AbstractS3Processor {
+
+    public static final PropertyDescriptor EXPIRATION_RULE_ID = new 
PropertyDescriptor.Builder()
+            .name("Expiration Time Rule")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor STORAGE_CLASS = new 
PropertyDescriptor.Builder()
+            .name("Storage Class")
+            .required(true)
+            .allowableValues(StorageClass.Standard.name(), 
StorageClass.ReducedRedundancy.name())
+            .defaultValue(StorageClass.Standard.name())
+            .build();
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, 
CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
+                    FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, 
READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(true)
+                .dynamic(true)
+                .build();
+    }
+
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+
+        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+        final AmazonS3 s3 = getClient();
+        final FlowFile ff = flowFile;
+        final Map<String, String> attributes = new HashMap<>();
+        try {
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream rawIn) throws 
IOException {
+                    try (final InputStream in = new 
BufferedInputStream(rawIn)) {
+                        final ObjectMetadata objectMetadata = new 
ObjectMetadata();
+                        
objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key()));
+                        objectMetadata.setContentLength(ff.getSize());
+
+                        final String expirationRule = 
context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue();
+                        if (expirationRule != null) {
+                            
objectMetadata.setExpirationTimeRuleId(expirationRule);
+                        }
+
+                        final Map<String, String> userMetadata = new 
HashMap<>();
+                        for (final Map.Entry<PropertyDescriptor, String> entry 
: context.getProperties().entrySet()) {
+                            if (entry.getKey().isDynamic()) {
+                                final String value = 
context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue();
+                                userMetadata.put(entry.getKey().getName(), 
value);
+                            }
+                        }
+
+                        if (!userMetadata.isEmpty()) {
+                            objectMetadata.setUserMetadata(userMetadata);
+                        }
+
+                        final PutObjectRequest request = new 
PutObjectRequest(bucket, key, in, objectMetadata);
+                        
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+                        final AccessControlList acl = createACL(context, ff);
+                        if (acl != null) {
+                            request.setAccessControlList(acl);
+                        }
+
+                        final PutObjectResult result = s3.putObject(request);
+                        if (result.getVersionId() != null) {
+                            attributes.put("s3.version", 
result.getVersionId());
+                        }
+
+                        attributes.put("s3.etag", result.getETag());
+
+                        final Date expiration = result.getExpirationTime();
+                        if (expiration != null) {
+                            attributes.put("s3.expiration", 
expiration.toString());
+                        }
+                    }
+                }
+            });
+
+            if (!attributes.isEmpty()) {
+                flowFile = session.putAllAttributes(flowFile, attributes);
+            }
+            session.transfer(flowFile, REL_SUCCESS);
+
+            final String url = "http://"; + bucket + ".s3.amazonaws.com/" + key;
+            final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            session.getProvenanceReporter().send(flowFile, url, millis);
+
+            getLogger().info("Successfully put {} to Amazon S3 in {} 
milliseconds", new Object[]{ff, millis});
+        } catch (final ProcessException | AmazonClientException pe) {
+            getLogger().error("Failed to put {} to Amazon S3 due to {}", new 
Object[]{flowFile, pe});
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
new file mode 100644
index 0000000..5b57647
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sns;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.sns.AmazonSNSClient;
+
+public abstract class AbstractSNSProcessor extends 
AbstractAWSProcessor<AmazonSNSClient> {
+
+    protected static final AllowableValue ARN_TYPE_TOPIC
+            = new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the 
name of a topic");
+    protected static final AllowableValue ARN_TYPE_TARGET
+            = new AllowableValue("Target ARN", "Target ARN", "The ARN is the 
name of a particular Target, used to notify a specific subscriber");
+
+    public static final PropertyDescriptor ARN = new 
PropertyDescriptor.Builder()
+            .name("Amazon Resource Name (ARN)")
+            .description("The name of the resource to which notifications 
should be published")
+            .expressionLanguageSupported(true)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ARN_TYPE = new 
PropertyDescriptor.Builder()
+            .name("ARN Type")
+            .description("The type of Amazon Resource Name that is being 
used.")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET)
+            .defaultValue(ARN_TYPE_TOPIC.getValue())
+            .build();
+
+    @Override
+    protected AmazonSNSClient createClient(final ProcessContext context, final 
AWSCredentials credentials, final ClientConfiguration config) {
+        return new AmazonSNSClient(credentials, config);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
new file mode 100644
index 0000000..7d42703
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sns;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.sqs.GetSQS;
+import org.apache.nifi.processors.aws.sqs.PutSQS;
+
+import com.amazonaws.services.sns.AmazonSNSClient;
+import com.amazonaws.services.sns.model.MessageAttributeValue;
+import com.amazonaws.services.sns.model.PublishRequest;
+
+@SupportsBatching
+@SeeAlso({GetSQS.class, PutSQS.class})
+@Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"})
+@CapabilityDescription("Sends the content of a FlowFile as a notification to 
the Amazon Simple Notification Service")
+public class PutSNS extends AbstractSNSProcessor {
+
+    public static final PropertyDescriptor CHARACTER_ENCODING = new 
PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("The character set in which the FlowFile's content is 
encoded")
+            .defaultValue("UTF-8")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .required(true)
+            .build();
+    public static final PropertyDescriptor USE_JSON_STRUCTURE = new 
PropertyDescriptor.Builder()
+            .name("Use JSON Structure")
+            .description("If true, the contents of the FlowFile must be JSON 
with a top-level element named 'default'."
+                    + " Additional elements can be used to send different 
messages to different protocols. See the Amazon"
+                    + " SNS Documentation for more information.")
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor SUBJECT = new 
PropertyDescriptor.Builder()
+            .name("E-mail Subject")
+            .description("The optional subject to use for any subscribers that 
are subscribed via E-mail")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, 
SECRET_KEY, CREDENTAILS_FILE, TIMEOUT,
+                    USE_JSON_STRUCTURE, CHARACTER_ENCODING));
+
+    public static final int MAX_SIZE = 256 * 1024;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(true)
+                .required(false)
+                .dynamic(true)
+                .build();
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        if (flowFile.getSize() > MAX_SIZE) {
+            getLogger().error("Cannot publish {} to SNS because its size 
exceeds Amazon SNS's limit of 256KB; routing to failure", new 
Object[]{flowFile});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_ENCODING).evaluateAttributeExpressions(flowFile).getValue());
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        session.exportTo(flowFile, baos);
+        final String message = new String(baos.toByteArray(), charset);
+
+        final AmazonSNSClient client = getClient();
+        final PublishRequest request = new PublishRequest();
+        request.setMessage(message);
+
+        if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) {
+            request.setMessageStructure("json");
+        }
+
+        final String arn = 
context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue();
+        final String arnType = context.getProperty(ARN_TYPE).getValue();
+        if (arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue())) {
+            request.setTopicArn(arn);
+        } else {
+            request.setTargetArn(arn);
+        }
+
+        final String subject = 
context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue();
+        if (subject != null) {
+            request.setSubject(subject);
+        }
+
+        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+            if (entry.getKey().isDynamic() && !isEmpty(entry.getValue())) {
+                final MessageAttributeValue value = new 
MessageAttributeValue();
+                
value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
+                value.setDataType("String");
+                request.addMessageAttributesEntry(entry.getKey().getName(), 
value);
+            }
+        }
+
+        try {
+            client.publish(request);
+            session.transfer(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().send(flowFile, arn);
+            getLogger().info("Successfully published notification for {}", new 
Object[]{flowFile});
+        } catch (final Exception e) {
+            getLogger().error("Failed to publish Amazon SNS message for {} due 
to {}", new Object[]{flowFile, e});
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
new file mode 100644
index 0000000..3cee02d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sqs;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+
+public abstract class AbstractSQSProcessor extends 
AbstractAWSProcessor<AmazonSQSClient> {
+
+    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("The maximum number of messages to send in a single 
network request")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("25")
+            .build();
+
+    public static final PropertyDescriptor QUEUE_URL = new 
PropertyDescriptor.Builder()
+            .name("Queue URL")
+            .description("The URL of the queue to act upon")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
+
+    @Override
+    protected AmazonSQSClient createClient(final ProcessContext context, final 
AWSCredentials credentials, final ClientConfiguration config) {
+        return new AmazonSQSClient(credentials, config);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
new file mode 100644
index 0000000..65e020d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sqs;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+
+@SupportsBatching
+@SeeAlso({GetSQS.class, PutSQS.class})
+@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"})
+@CapabilityDescription("Deletes a message from an Amazon Simple Queuing 
Service Queue")
+public class DeleteSQS extends AbstractSQSProcessor {
+
+    public static final PropertyDescriptor RECEIPT_HANDLE = new 
PropertyDescriptor.Builder()
+            .name("Receipt Handle")
+            .description("The identifier that specifies the receipt of the 
message")
+            .expressionLanguageSupported(true)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("${sqs.receipt.handle}")
+            .build();
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        List<FlowFile> flowFiles = session.get(1);
+        if (flowFiles.isEmpty()) {
+            return;
+        }
+
+        final FlowFile firstFlowFile = flowFiles.get(0);
+        final String queueUrl = 
context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue();
+
+        final AmazonSQSClient client = getClient();
+        final DeleteMessageBatchRequest request = new 
DeleteMessageBatchRequest();
+        request.setQueueUrl(queueUrl);
+
+        final List<DeleteMessageBatchRequestEntry> entries = new 
ArrayList<>(flowFiles.size());
+
+        for (final FlowFile flowFile : flowFiles) {
+            final DeleteMessageBatchRequestEntry entry = new 
DeleteMessageBatchRequestEntry();
+            
entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue());
+            entries.add(entry);
+        }
+
+        request.setEntries(entries);
+
+        try {
+            client.deleteMessageBatch(request);
+            getLogger().info("Successfully deleted {} objects from SQS", new 
Object[]{flowFiles.size()});
+            session.transfer(flowFiles, REL_SUCCESS);
+        } catch (final Exception e) {
+            getLogger().error("Failed to delete {} objects from SQS due to 
{}", new Object[]{flowFiles.size(), e});
+            session.transfer(flowFiles, REL_FAILURE);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
new file mode 100644
index 0000000..7c2dd2d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sqs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.MessageAttributeValue;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+
+@SupportsBatching
+@Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
+@SeeAlso({PutSQS.class, DeleteSQS.class})
+@CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service 
Queue")
+@WritesAttributes({
+    @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of 
the message"),
+    @WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
+    @WritesAttribute(attribute = "sqs.message.id", description = "The unique 
identifier of the SQS message"),
+    @WritesAttribute(attribute = "sqs.receipt.handle", description = "The SQS 
Receipt Handle that is to be used to delete the message from the queue")
+})
+public class GetSQS extends AbstractSQSProcessor {
+
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("The Character Set that should be used to encode the 
textual content of the SQS message")
+            .required(true)
+            .defaultValue("UTF-8")
+            .allowableValues(Charset.availableCharsets().keySet().toArray(new 
String[0]))
+            .build();
+
+    public static final PropertyDescriptor AUTO_DELETE = new 
PropertyDescriptor.Builder()
+            .name("Auto Delete Messages")
+            .description("Specifies whether the messages should be 
automatically deleted by the processors once they have been received.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    public static final PropertyDescriptor VISIBILITY_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Visibility Timeout")
+            .description("The amount of time after a message is received but 
not deleted that the message is hidden from other consumers")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .defaultValue("15 mins")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("The maximum number of messages to send in a single 
network request")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(1L, 10L, 
true))
+            .defaultValue("10")
+            .build();
+
+    public static final PropertyDescriptor STATIC_QUEUE_URL = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(QUEUE_URL)
+            .expressionLanguageSupported(false)
+            .build();
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, 
SECRET_KEY, CREDENTAILS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, 
VISIBILITY_TIMEOUT));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.singleton(REL_SUCCESS);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        final String queueUrl = 
context.getProperty(STATIC_QUEUE_URL).getValue();
+
+        final AmazonSQSClient client = getClient();
+
+        final ReceiveMessageRequest request = new ReceiveMessageRequest();
+        request.setAttributeNames(Collections.singleton("All"));
+        
request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger());
+        
request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue());
+        request.setQueueUrl(queueUrl);
+
+        final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
+
+        final ReceiveMessageResult result;
+        try {
+            result = client.receiveMessage(request);
+        } catch (final Exception e) {
+            getLogger().error("Failed to receive messages from Amazon SQS due 
to {}", new Object[]{e});
+            context.yield();
+            return;
+        }
+
+        final List<Message> messages = result.getMessages();
+        if (messages.isEmpty()) {
+            context.yield();
+            return;
+        }
+
+        final boolean autoDelete = 
context.getProperty(AUTO_DELETE).asBoolean();
+
+        for (final Message message : messages) {
+            FlowFile flowFile = session.create();
+
+            final Map<String, String> attributes = new HashMap<>();
+            for (final Map.Entry<String, String> entry : 
message.getAttributes().entrySet()) {
+                attributes.put("sqs." + entry.getKey(), entry.getValue());
+            }
+
+            for (final Map.Entry<String, MessageAttributeValue> entry : 
message.getMessageAttributes().entrySet()) {
+                attributes.put("sqs." + entry.getKey(), 
entry.getValue().getStringValue());
+            }
+
+            attributes.put("hash.value", message.getMD5OfBody());
+            attributes.put("hash.algorithm", "md5");
+            attributes.put("sqs.message.id", message.getMessageId());
+            attributes.put("sqs.receipt.handle", message.getReceiptHandle());
+
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            flowFile = session.write(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream out) throws IOException 
{
+                    out.write(message.getBody().getBytes(charset));
+                }
+            });
+
+            session.transfer(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().receive(flowFile, queueUrl);
+
+            getLogger().info("Successfully received {} from Amazon SQS", new 
Object[]{flowFile});
+        }
+
+        if (autoDelete) {
+            // If we want to auto-delete messages, we must fist commit the 
session to ensure that the data
+            // is persisted in NiFi's repositories.
+            session.commit();
+
+            final DeleteMessageBatchRequest deleteRequest = new 
DeleteMessageBatchRequest();
+            deleteRequest.setQueueUrl(queueUrl);
+            final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = 
new ArrayList<>();
+            for (final Message message : messages) {
+                final DeleteMessageBatchRequestEntry entry = new 
DeleteMessageBatchRequestEntry();
+                entry.setId(message.getMessageId());
+                entry.setReceiptHandle(message.getReceiptHandle());
+                deleteRequestEntries.add(entry);
+            }
+
+            deleteRequest.setEntries(deleteRequestEntries);
+
+            try {
+                client.deleteMessageBatch(deleteRequest);
+            } catch (final Exception e) {
+                getLogger().error("Received {} messages from Amazon SQS but 
failed to delete the messages; these messages"
+                        + " may be duplicated. Reason for deletion failure: 
{}", new Object[]{messages.size(), e});
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
new file mode 100644
index 0000000..3961f32
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sqs;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.MessageAttributeValue;
+import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
+import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
+
+@SupportsBatching
+@Tags({"Amazon", "AWS", "SQS", "Queue", "Put", "Publish"})
+@SeeAlso({GetSQS.class, DeleteSQS.class})
+@CapabilityDescription("Publishes a message to an Amazon Simple Queuing 
Service Queue")
+@DynamicProperty(name = "The name of a Message Attribute to add to the 
message", value = "The value of the Message Attribute",
+        description = "Allows the user to add key/value pairs as Message 
Attributes by adding a property whose name will become the name of "
+        + "the Message Attribute and value will become the value of the 
Message Attribute", supportsExpressionLanguage = true)
+public class PutSQS extends AbstractSQSProcessor {
+
+    public static final PropertyDescriptor DELAY = new 
PropertyDescriptor.Builder()
+            .name("Delay")
+            .description("The amount of time to delay the message before it 
becomes available to consumers")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("0 secs")
+            .build();
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, 
REGION, DELAY, TIMEOUT));
+
+    private volatile List<PropertyDescriptor> userDefinedProperties = 
Collections.emptyList();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .expressionLanguageSupported(true)
+                .required(false)
+                .dynamic(true)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .build();
+    }
+
+    @OnScheduled
+    public void setup(final ProcessContext context) {
+        userDefinedProperties = new ArrayList<>();
+        for (final PropertyDescriptor descriptor : 
context.getProperties().keySet()) {
+            if (descriptor.isDynamic()) {
+                userDefinedProperties.add(descriptor);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+        final AmazonSQSClient client = getClient();
+        final SendMessageBatchRequest request = new SendMessageBatchRequest();
+        final String queueUrl = 
context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
+        request.setQueueUrl(queueUrl);
+
+        final Set<SendMessageBatchRequestEntry> entries = new HashSet<>();
+
+        final SendMessageBatchRequestEntry entry = new 
SendMessageBatchRequestEntry();
+        entry.setId(flowFile.getAttribute("uuid"));
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        session.exportTo(flowFile, baos);
+        final String flowFileContent = baos.toString();
+        entry.setMessageBody(flowFileContent);
+
+        final Map<String, MessageAttributeValue> messageAttributes = new 
HashMap<>();
+
+        for (final PropertyDescriptor descriptor : userDefinedProperties) {
+            final MessageAttributeValue mav = new MessageAttributeValue();
+            mav.setDataType("String");
+            
mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
+            messageAttributes.put(descriptor.getName(), mav);
+        }
+
+        entry.setMessageAttributes(messageAttributes);
+        
entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
+        entries.add(entry);
+
+        request.setEntries(entries);
+
+        try {
+            client.sendMessageBatch(request);
+        } catch (final Exception e) {
+            getLogger().error("Failed to send messages to Amazon SQS due to 
{}; routing to failure", new Object[]{e});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        getLogger().info("Successfully published message to Amazon SQS for 
{}", new Object[]{flowFile});
+        session.transfer(flowFile, REL_SUCCESS);
+        final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        session.getProvenanceReporter().send(flowFile, queueUrl, 
transmissionMillis);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..4f2405c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.aws.s3.FetchS3Object
+org.apache.nifi.processors.aws.s3.PutS3Object
+org.apache.nifi.processors.aws.sns.PutSNS
+org.apache.nifi.processors.aws.sqs.GetSQS
+org.apache.nifi.processors.aws.sqs.PutSQS
+org.apache.nifi.processors.aws.sqs.DeleteSQS

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
new file mode 100644
index 0000000..0321514
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("For local testing only - interacts with S3 so the credentials file 
must be configured and all necessary buckets created")
+public class TestFetchS3Object {
+
+    private final String CREDENTIALS_FILE = System.getProperty("user.home") + 
"/aws-credentials.properties";
+
+    @Test
+    public void testGet() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
FetchS3Object());
+        runner.setProperty(FetchS3Object.BUCKET, 
"anonymous-test-bucket-00000000");
+        runner.setProperty(FetchS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(FetchS3Object.KEY, "folder/1.txt");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("start", "0");
+
+        runner.enqueue(new byte[0], attrs);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
+        final List<MockFlowFile> ffs = 
runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
+        final MockFlowFile out = ffs.iterator().next();
+
+        final byte[] expectedBytes = 
Files.readAllBytes(Paths.get("src/test/resources/hello.txt"));
+        out.assertContentEquals(new String(expectedBytes));
+        for (final Map.Entry<String, String> entry : 
out.getAttributes().entrySet()) {
+            System.out.println(entry.getKey() + " : " + entry.getValue());
+        }
+    }
+
+}

Reply via email to