This is an automated email from the ASF dual-hosted git repository.

alopresto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new e841f4d  NIFI-4256 Adds AWS Encryption Controller Service. NIFI-4256 
Adds AWS S3 FlowFile encryption attributes, more javadocs, better names.
e841f4d is described below

commit e841f4d5b7da7cf88cbb1805d351bf75ca8a3280
Author: Troy Melhase <t...@troy.io>
AuthorDate: Sat Jul 6 11:59:44 2019 -0800

    NIFI-4256 Adds AWS Encryption Controller Service.
    NIFI-4256 Adds AWS S3 FlowFile encryption attributes, more javadocs,
    better names.
    
    This closes #3574.
    
    Signed-off-by: Andy LoPresto <alopre...@apache.org>
---
 .../nifi/processors/aws/AbstractAWSProcessor.java  |   4 +-
 .../processors/aws/s3/AbstractS3Processor.java     |  31 ++--
 .../aws/s3/AmazonS3EncryptionService.java          |  80 +++++++++
 .../nifi-aws-bundle/nifi-aws-processors/pom.xml    |   5 +
 .../nifi/processors/aws/s3/FetchS3Object.java      |  14 +-
 .../apache/nifi/processors/aws/s3/PutS3Object.java |  37 +++-
 .../ClientSideCMKEncryptionStrategy.java           |  95 ++++++++++
 .../ClientSideKMSEncryptionStrategy.java           |  65 +++++++
 .../aws/s3/encryption/NoOpEncryptionStrategy.java  |  20 +++
 .../aws/s3/encryption/S3EncryptionStrategy.java    |  92 ++++++++++
 .../ServerSideCEKEncryptionStrategy.java           |  75 ++++++++
 .../ServerSideKMSEncryptionStrategy.java           |  43 +++++
 .../encryption/ServerSideS3EncryptionStrategy.java |  36 ++++
 .../s3/encryption/StandardS3EncryptionService.java | 189 ++++++++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |   4 +-
 .../additionalDetails.html                         |  71 ++++++++
 .../nifi/processors/aws/s3/AbstractS3IT.java       |  27 +++
 .../nifi/processors/aws/s3/ITPutS3Object.java      | 198 ++++++++++++++++++++-
 .../nifi/processors/aws/s3/TestFetchS3Object.java  |   4 +-
 .../nifi/processors/aws/s3/TestPutS3Object.java    |   7 +-
 .../s3/encryption/TestS3EncryptionStrategies.java  | 180 +++++++++++++++++++
 .../TestStandardS3EncryptionService.java           | 102 +++++++++++
 22 files changed, 1347 insertions(+), 32 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index 06f4a16..10d19ff 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -157,11 +157,11 @@ public abstract class AbstractAWSProcessor<ClientType 
extends AmazonWebServiceCl
     private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
     public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = 
ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
 
-    protected static AllowableValue createAllowableValue(final Regions region) 
{
+    public static AllowableValue createAllowableValue(final Regions region) {
         return new AllowableValue(region.getName(), region.getDescription(), 
"AWS Region Code : " + region.getName());
     }
 
-    protected static AllowableValue[] getAvailableRegions() {
+    public static AllowableValue[] getAvailableRegions() {
         final List<AllowableValue> values = new ArrayList<>();
         for (final Regions region : Regions.values()) {
             values.add(createAllowableValue(region));
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
index 5cc24e9..5e8ff32 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -126,19 +127,34 @@ public abstract class AbstractS3Processor extends 
AbstractAWSCredentialsProvider
                     new AllowableValue("S3SignerType", "Signature v2"))
             .defaultValue("Default Signature")
             .build();
+    public static final PropertyDescriptor ENCRYPTION_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("encryption-service")
+            .displayName("Encryption Service")
+            .description("Specifies the Encryption Service Controller used 
configure requests.  "
+                    + "For backward compatibility, this value is ignored when 
'Server Side Encryption' is set.")
+            .required(false)
+            .identifiesControllerService(AmazonS3EncryptionService.class)
+            .build();
+
     /**
      * Create client using credentials provider. This is the preferred way for 
creating clients
      */
     @Override
     protected AmazonS3Client createClient(final ProcessContext context, final 
AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
         getLogger().info("Creating client with credentials provider");
-
         initializeSignerOverride(context, config);
+        AmazonS3EncryptionService encryptionService = 
context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
+        AmazonS3Client s3 = null;
 
-        final AmazonS3Client s3 = new AmazonS3Client(credentialsProvider, 
config);
+        if (encryptionService != null) {
+            s3 = encryptionService.createEncryptionClient(credentialsProvider, 
config);
+        }
 
-        initalizeEndpointOverride(context, s3);
+        if (s3 == null) {
+            s3 = new AmazonS3Client(credentialsProvider, config);
+        }
 
+        initalizeEndpointOverride(context, s3);
         return s3;
     }
 
@@ -167,14 +183,7 @@ public abstract class AbstractS3Processor extends 
AbstractAWSCredentialsProvider
     @Override
     protected AmazonS3Client createClient(final ProcessContext context, final 
AWSCredentials credentials, final ClientConfiguration config) {
         getLogger().info("Creating client with AWS credentials");
-
-        initializeSignerOverride(context, config);
-
-        final AmazonS3Client s3 = new AmazonS3Client(credentials, config);
-
-        initalizeEndpointOverride(context, s3);
-
-        return s3;
+        return createClient(context, new 
AWSStaticCredentialsProvider(credentials), config);
     }
 
     protected Grantee createGrantee(final String value) {
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java
new file mode 100644
index 0000000..946a1cc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.nifi.controller.ControllerService;
+
+/**
+ * This interface defines how clients interact with an S3 encryption service.
+ */
+public interface AmazonS3EncryptionService extends ControllerService {
+
+    /**
+     * Configure a {@link PutObjectRequest} for encryption.
+     * @param request the request to configure.
+     * @param objectMetadata the request metadata to configure.
+     */
+    void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata 
objectMetadata);
+
+    /**
+     * Configure an {@link InitiateMultipartUploadRequest} for encryption.
+     * @param request the request to configure.
+     * @param objectMetadata the request metadata to configure.
+     */
+    void 
configureInitiateMultipartUploadRequest(InitiateMultipartUploadRequest request, 
ObjectMetadata objectMetadata);
+
+    /**
+     * Configure a {@link GetObjectRequest} for encryption.
+     * @param request the request to configure.
+     * @param objectMetadata the request metadata to configure.
+     */
+    void configureGetObjectRequest(GetObjectRequest request, ObjectMetadata 
objectMetadata);
+
+    /**
+     * Configure an {@link UploadPartRequest} for encryption.
+     * @param request the request to configure.
+     * @param objectMetadata the request metadata to configure.
+     */
+    void configureUploadPartRequest(UploadPartRequest request, ObjectMetadata 
objectMetadata);
+
+    /**
+     * Create an S3 encryption client.
+     *
+     * @param credentialsProvider AWS credentials provider.
+     * @param clientConfiguration Client configuration.
+     * @return {@link AmazonS3Client}, perhaps an {@link 
com.amazonaws.services.s3.AmazonS3EncryptionClient}
+     */
+    AmazonS3Client createEncryptionClient(AWSCredentialsProvider 
credentialsProvider, ClientConfiguration clientConfiguration);
+
+    /**
+     * @return The region associated with the service, as a String.
+     */
+    String getRegion();
+
+    /**
+     * @return The name of the encryption strategy associated with the service.
+     */
+    String getStrategyName();
+}
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index b6d7931..5e739e1 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -73,6 +73,11 @@
             <groupId>com.amazonaws</groupId>
             <artifactId>aws-java-sdk-sts</artifactId>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index b66468a..8dabeb9 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -64,7 +64,8 @@ import com.amazonaws.services.s3.model.S3Object;
     @WritesAttribute(attribute = "s3.expirationTime", description = "If the 
file has an expiration date, this attribute will be set, containing the 
milliseconds since epoch in UTC time"),
     @WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The 
ID of the rule that dictates this object's expiration time"),
     @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server 
side encryption algorithm of the object"),
-    @WritesAttribute(attribute = "s3.version", description = "The version of 
the S3 object"),})
+    @WritesAttribute(attribute = "s3.version", description = "The version of 
the S3 object"),
+    @WritesAttribute(attribute = "s3.encryptionStrategy", description = "The 
name of the encryption strategy, if any was set"),})
 public class FetchS3Object extends AbstractS3Processor {
 
     public static final PropertyDescriptor VERSION_ID = new 
PropertyDescriptor.Builder()
@@ -89,8 +90,8 @@ public class FetchS3Object extends AbstractS3Processor {
 
     public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
             Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID,
-                SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, 
PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, 
PROXY_PASSWORD,
-                REQUESTER_PAYS));
+                SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, 
ENCRYPTION_SERVICE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST,
+                PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, 
REQUESTER_PAYS));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -120,6 +121,13 @@ public class FetchS3Object extends AbstractS3Processor {
         request.setRequesterPays(requesterPays);
 
         final Map<String, String> attributes = new HashMap<>();
+
+        AmazonS3EncryptionService encryptionService = 
context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
+        if (encryptionService != null) {
+            encryptionService.configureGetObjectRequest(request, new 
ObjectMetadata());
+            attributes.put("s3.encryptionStrategy", 
encryptionService.getStrategyName());
+        }
+
         try (final S3Object s3Object = client.getObject(request)) {
             flowFile = session.importFrom(s3Object.getObjectContent(), 
flowFile);
             attributes.put("s3.bucket", s3Object.getBucketName());
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 537d269..81c55b6 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -122,8 +122,8 @@ import com.amazonaws.services.s3.model.UploadPartResult;
             "the S3 object, if one is set"),
     @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server 
side encryption algorithm of the object"),
     @WritesAttribute(attribute = "s3.usermetadata", description = "A 
human-readable form of the User Metadata of " +
-            "the S3 object, if any was set")
-})
+            "the S3 object, if any was set"),
+    @WritesAttribute(attribute = "s3.encryptionStrategy", description = "The 
name of the encryption strategy, if any was set"),})
 public class PutS3Object extends AbstractS3Processor {
 
     public static final long MIN_S3_PART_SIZE = 50L * 1024L * 1024L;
@@ -235,7 +235,8 @@ public class PutS3Object extends AbstractS3Processor {
         Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, 
REMOVE_TAG_PREFIX,
             STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, 
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, 
WRITE_ACL_LIST, OWNER,
             CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, 
SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, 
MULTIPART_S3_AGEOFF_INTERVAL,
-            MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, 
PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, 
PROXY_PASSWORD));
+            MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, 
PROXY_CONFIGURATION_SERVICE, PROXY_HOST,
+            PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
 
     final static String S3_BUCKET_KEY = "s3.bucket";
     final static String S3_OBJECT_KEY = "s3.key";
@@ -251,6 +252,8 @@ public class PutS3Object extends AbstractS3Processor {
     final static String S3_API_METHOD_PUTOBJECT = "putobject";
     final static String S3_API_METHOD_MULTIPARTUPLOAD = "multipartupload";
     final static String S3_SSE_ALGORITHM = "s3.sseAlgorithm";
+    final static String S3_ENCRYPTION_STRATEGY = "s3.encryptionStrategy";
+
 
     final static String S3_PROCESS_UNSCHEDULED_MESSAGE = "Processor 
unscheduled, stopping upload";
 
@@ -276,6 +279,7 @@ public class PutS3Object extends AbstractS3Processor {
     protected boolean localUploadExistsInS3(final AmazonS3Client s3, final 
String bucket, final MultipartState localState) {
         ListMultipartUploadsRequest listRequest = new 
ListMultipartUploadsRequest(bucket);
         MultipartUploadListing listing = s3.listMultipartUploads(listRequest);
+
         for (MultipartUpload upload : listing.getMultipartUploads()) {
             if (upload.getUploadId().equals(localState.getUploadId())) {
                 return true;
@@ -473,9 +477,13 @@ public class PutS3Object extends AbstractS3Processor {
                         }
 
                         final String serverSideEncryption = 
context.getProperty(SERVER_SIDE_ENCRYPTION).getValue();
+                        AmazonS3EncryptionService encryptionService = null;
+
                         if 
(!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) {
                             
objectMetadata.setSSEAlgorithm(serverSideEncryption);
                             attributes.put(S3_SSE_ALGORITHM, 
serverSideEncryption);
+                        } else {
+                            encryptionService = 
context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
                         }
 
                         if (!userMetadata.isEmpty()) {
@@ -487,12 +495,17 @@ public class PutS3Object extends AbstractS3Processor {
                             // single part upload
                             //----------------------------------------
                             final PutObjectRequest request = new 
PutObjectRequest(bucket, key, in, objectMetadata);
-                            request.setStorageClass(
-                                    
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+                            if (encryptionService != null) {
+                                
encryptionService.configurePutObjectRequest(request, objectMetadata);
+                                attributes.put(S3_ENCRYPTION_STRATEGY, 
encryptionService.getStrategyName());
+                            }
+
+                            
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
                             final AccessControlList acl = createACL(context, 
ff);
                             if (acl != null) {
                                 request.setAccessControlList(acl);
                             }
+
                             final CannedAccessControlList cannedAcl = 
createCannedACL(context, ff);
                             if (cannedAcl != null) {
                                 request.withCannedAcl(cannedAcl);
@@ -583,9 +596,13 @@ public class PutS3Object extends AbstractS3Processor {
                             // initiate multipart upload or find position in 
file
                             
//------------------------------------------------------------
                             if (currentState.getUploadId().isEmpty()) {
-                                final InitiateMultipartUploadRequest 
initiateRequest =
-                                        new 
InitiateMultipartUploadRequest(bucket, key, objectMetadata);
+                                final InitiateMultipartUploadRequest 
initiateRequest = new InitiateMultipartUploadRequest(bucket, key, 
objectMetadata);
+                                if (encryptionService != null) {
+                                    
encryptionService.configureInitiateMultipartUploadRequest(initiateRequest, 
objectMetadata);
+                                    attributes.put(S3_ENCRYPTION_STRATEGY, 
encryptionService.getStrategyName());
+                                }
                                 
initiateRequest.setStorageClass(currentState.getStorageClass());
+
                                 final AccessControlList acl = 
createACL(context, ff);
                                 if (acl != null) {
                                     initiateRequest.setAccessControlList(acl);
@@ -662,6 +679,9 @@ public class PutS3Object extends AbstractS3Processor {
                                         .withInputStream(in)
                                         .withPartNumber(part)
                                         .withPartSize(thisPartSize);
+                                if (encryptionService != null) {
+                                    
encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata);
+                                }
                                 try {
                                     UploadPartResult uploadPartResult = 
s3.uploadPart(uploadRequest);
                                     
currentState.addPartETag(uploadPartResult.getPartETag());
@@ -686,6 +706,8 @@ public class PutS3Object extends AbstractS3Processor {
                             
//------------------------------------------------------------
                             CompleteMultipartUploadRequest completeRequest = 
new CompleteMultipartUploadRequest(
                                     bucket, key, currentState.getUploadId(), 
currentState.getPartETags());
+
+                            // No call to an encryption service is needed for 
a CompleteMultipartUploadRequest.
                             try {
                                 CompleteMultipartUploadResult completeResult =
                                         
s3.completeMultipartUpload(completeRequest);
@@ -812,6 +834,7 @@ public class PutS3Object extends AbstractS3Processor {
         final String uploadId = upload.getUploadId();
         final AbortMultipartUploadRequest abortRequest = new 
AbortMultipartUploadRequest(
                 bucket, uploadKey, uploadId);
+        // No call to an encryption service is necessary for an 
AbortMultipartUploadRequest.
         try {
             s3.abortMultipartUpload(abortRequest);
             getLogger().info("Aborting out of date multipart upload, bucket {} 
key {} ID {}, initiated {}",
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCMKEncryptionStrategy.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCMKEncryptionStrategy.java
new file mode 100644
index 0000000..d157ea5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCMKEncryptionStrategy.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.AmazonS3EncryptionClient;
+import com.amazonaws.services.s3.model.CryptoConfiguration;
+import com.amazonaws.services.s3.model.EncryptionMaterials;
+import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.ValidationResult;
+
+import javax.crypto.spec.SecretKeySpec;
+
+/**
+ * This strategy uses a client master key to perform client-side encryption.   
Use this strategy when you want the client to perform the encryption,
+ * (thus incurring the cost of processing) and when you want to manage the key 
material yourself.
+ *
+ * See 
https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingClientSideEncryption.html#client-side-encryption-client-side-master-key-intro
+ *
+ */
+public class ClientSideCMKEncryptionStrategy implements S3EncryptionStrategy {
+    /**
+     * Create an encryption client.
+     *
+     * @param credentialsProvider AWS credentials provider.
+     * @param clientConfiguration Client configuration
+     * @param region AWS region
+     * @param keyIdOrMaterial client master key, always base64 encoded
+     * @return AWS S3 client
+     */
+    @Override
+    public AmazonS3Client createEncryptionClient(AWSCredentialsProvider 
credentialsProvider, ClientConfiguration clientConfiguration, String region, 
String keyIdOrMaterial) throws SecurityException {
+        if (!validateKey(keyIdOrMaterial).isValid()) {
+            throw new SecurityException("Invalid client key; ensure key 
material is base64 encoded.");
+        }
+
+        byte[] keyMaterial = Base64.decodeBase64(keyIdOrMaterial);
+        SecretKeySpec symmetricKey = new SecretKeySpec(keyMaterial, "AES");
+        StaticEncryptionMaterialsProvider encryptionMaterialsProvider = new 
StaticEncryptionMaterialsProvider(new EncryptionMaterials(symmetricKey));
+        boolean haveRegion = StringUtils.isNotBlank(region);
+        CryptoConfiguration cryptoConfig = new CryptoConfiguration();
+        Region awsRegion = null;
+
+        if (haveRegion) {
+            awsRegion = Region.getRegion(Regions.fromName(region));
+            cryptoConfig.setAwsKmsRegion(awsRegion);
+        }
+
+        AmazonS3EncryptionClient client = new 
AmazonS3EncryptionClient(credentialsProvider, encryptionMaterialsProvider, 
cryptoConfig);
+        if (haveRegion && awsRegion != null) {
+            client.setRegion(awsRegion);
+        }
+
+        return client;
+    }
+
+    public ValidationResult validateKey(String keyValue) {
+        if (StringUtils.isBlank(keyValue) || !Base64.isBase64(keyValue)) {
+            return new ValidationResult.Builder().valid(false).build();
+        }
+
+        boolean decoded = false;
+        boolean sized = false;
+        byte[] keyMaterial;
+
+        try {
+            keyMaterial = Base64.decodeBase64(keyValue);
+            decoded = true;
+            sized = keyMaterial.length == 32 || keyMaterial.length == 24 || 
keyMaterial.length == 16;
+        } catch (final Exception ignored) {
+        }
+
+        return new ValidationResult.Builder().valid(decoded && sized).build();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java
new file mode 100644
index 0000000..e6d75e4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.AmazonS3EncryptionClient;
+import com.amazonaws.services.s3.model.CryptoConfiguration;
+import com.amazonaws.services.s3.model.KMSEncryptionMaterialsProvider;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * This strategy uses KMS key id to perform client-side encryption.  Use this 
strategy when you want the client to perform the encryption,
+ * (thus incurring the cost of processing) and manage the key in a KMS 
instance.
+ *
+ * See 
https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingClientSideEncryption.html#client-side-encryption-kms-managed-master-key-intro
+ *
+ */
+public class ClientSideKMSEncryptionStrategy implements S3EncryptionStrategy {
+    /**
+     * Create an encryption client.
+     *
+     * @param credentialsProvider AWS credentials provider.
+     * @param clientConfiguration Client configuration
+     * @param region AWS region
+     * @param keyIdOrMaterial KMS key id
+     * @return AWS S3 client
+     */
+    @Override
+    public AmazonS3Client createEncryptionClient(AWSCredentialsProvider 
credentialsProvider, ClientConfiguration clientConfiguration, String region, 
String keyIdOrMaterial) {
+        KMSEncryptionMaterialsProvider materialProvider = new 
KMSEncryptionMaterialsProvider(keyIdOrMaterial);
+        boolean haveRegion = StringUtils.isNotBlank(region);
+        Region awsRegion = null;
+
+        CryptoConfiguration cryptoConfig = new CryptoConfiguration();
+        if (haveRegion) {
+            awsRegion = Region.getRegion(Regions.fromName(region));
+            cryptoConfig.setAwsKmsRegion(awsRegion);
+        }
+
+        AmazonS3EncryptionClient client = new 
AmazonS3EncryptionClient(credentialsProvider, materialProvider, cryptoConfig);
+        if (haveRegion) {
+            client.setRegion(awsRegion);
+        }
+
+        return client;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/NoOpEncryptionStrategy.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/NoOpEncryptionStrategy.java
new file mode 100644
index 0000000..5307504
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/NoOpEncryptionStrategy.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+public class NoOpEncryptionStrategy implements S3EncryptionStrategy {
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java
new file mode 100644
index 0000000..677fc0e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.nifi.components.ValidationResult;
+
+/**
+ * This interface defines the API for S3 encryption strategies.  The methods 
have empty defaults
+ * to minimize the burden on implementations.
+ *
+ */
+public interface S3EncryptionStrategy {
+
+    /**
+     * Configure a {@link PutObjectRequest} for encryption.
+     * @param request the request to configure.
+     * @param objectMetadata the request metadata to configure.
+     * @param keyValue the key id or key material.
+     */
+    default void configurePutObjectRequest(PutObjectRequest request, 
ObjectMetadata objectMetadata, String keyValue) {
+    }
+
+    /**
+     * Configure an {@link InitiateMultipartUploadRequest} for encryption.
+     * @param request the request to configure.
+     * @param objectMetadata the request metadata to configure.
+     * @param keyValue the key id or key material.
+     */
+    default void 
configureInitiateMultipartUploadRequest(InitiateMultipartUploadRequest request, 
ObjectMetadata objectMetadata, String keyValue) {
+    }
+
+    /**
+     * Configure a {@link GetObjectRequest} for encryption.
+     * @param request the request to configure.
+     * @param objectMetadata the request metadata to configure.
+     * @param keyValue the key id or key material.
+     */
+    default void configureGetObjectRequest(GetObjectRequest request, 
ObjectMetadata objectMetadata, String keyValue) {
+    }
+
+    /**
+     * Configure an {@link UploadPartRequest} for encryption.
+     * @param request the request to configure.
+     * @param objectMetadata the request metadata to configure.
+     * @param keyValue the key id or key material.
+     */
+    default void configureUploadPartRequest(UploadPartRequest request, 
ObjectMetadata objectMetadata, String keyValue) {
+    }
+
+    /**
+     * Create an S3 encryption client.
+     *
+     * @param credentialsProvider AWS credentials provider.
+     * @param clientConfiguration Client configuration.
+     * @return {@link AmazonS3Client}, perhaps an {@link 
com.amazonaws.services.s3.AmazonS3EncryptionClient}
+     */
+    default AmazonS3Client createEncryptionClient(AWSCredentialsProvider 
credentialsProvider, ClientConfiguration clientConfiguration, String region, 
String keyIdOrMaterial) throws SecurityException {
+        return null;
+    }
+
+    /**
+     * Validate a key id or key material.
+     *
+     * @param keyValue key id or key material to validate.
+     * @return ValidationResult instance.
+     */
+    default ValidationResult validateKey(String keyValue) {
+        return new ValidationResult.Builder().valid(true).build();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEKEncryptionStrategy.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEKEncryptionStrategy.java
new file mode 100644
index 0000000..231a5c8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEKEncryptionStrategy.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.SSECustomerKey;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.nifi.components.ValidationResult;
+import org.bouncycastle.util.encoders.Base64;
+
+/**
+ * This strategy uses a customer key to perform server-side encryption.  Use 
this strategy when you want the server to perform the encryption,
+ * (meaning you pay cost of processing) and when you want to manage the key 
material yourself.
+ *
+ * See 
https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
+ *
+ */
+public class ServerSideCEKEncryptionStrategy implements S3EncryptionStrategy {
+    @Override
+    public void configurePutObjectRequest(PutObjectRequest request, 
ObjectMetadata objectMetadata, String keyValue) {
+        SSECustomerKey customerKey = new SSECustomerKey(keyValue);
+        request.setSSECustomerKey(customerKey);
+    }
+
+    @Override
+    public void 
configureInitiateMultipartUploadRequest(InitiateMultipartUploadRequest request, 
ObjectMetadata objectMetadata, String keyValue) {
+        SSECustomerKey customerKey = new SSECustomerKey(keyValue);
+        request.setSSECustomerKey(customerKey);
+    }
+
+    @Override
+    public void configureGetObjectRequest(GetObjectRequest request, 
ObjectMetadata objectMetadata, String keyValue) {
+        SSECustomerKey customerKey = new SSECustomerKey(keyValue);
+        request.setSSECustomerKey(customerKey);
+    }
+
+    @Override
+    public void configureUploadPartRequest(UploadPartRequest request, 
ObjectMetadata objectMetadata, String keyValue) {
+        SSECustomerKey customerKey = new SSECustomerKey(keyValue);
+        request.setSSECustomerKey(customerKey);
+    }
+
+    @Override
+    public ValidationResult validateKey(String keyValue) {
+        boolean decoded = false;
+        boolean sized = false;
+        byte[] keyMaterial;
+
+        try {
+            keyMaterial = Base64.decode(keyValue);
+            decoded = true;
+            sized = (keyMaterial.length > 0) && (keyMaterial.length % 32) == 0;
+        } catch (final Exception ignored) {
+        }
+
+        return new ValidationResult.Builder().valid(decoded && sized).build();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideKMSEncryptionStrategy.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideKMSEncryptionStrategy.java
new file mode 100644
index 0000000..a31222e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideKMSEncryptionStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
+
+/**
+ * This strategy uses a KMS key to perform server-side encryption.  Use this 
strategy when you want the server to perform the encryption,
+ * (meaning you pay the cost of processing) and when you want to use a KMS key.
+ *
+ * See https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
+ *
+ */
+public class ServerSideKMSEncryptionStrategy implements S3EncryptionStrategy {
+    @Override
+    public void configurePutObjectRequest(PutObjectRequest request, 
ObjectMetadata objectMetadata, String keyValue) {
+        SSEAwsKeyManagementParams keyParams = new 
SSEAwsKeyManagementParams(keyValue);
+        request.setSSEAwsKeyManagementParams(keyParams);
+    }
+
+    @Override
+    public void 
configureInitiateMultipartUploadRequest(InitiateMultipartUploadRequest request, 
ObjectMetadata objectMetadata, String keyValue) {
+        SSEAwsKeyManagementParams keyParams = new 
SSEAwsKeyManagementParams(keyValue);
+        request.setSSEAwsKeyManagementParams(keyParams);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java
new file mode 100644
index 0000000..0845e12
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+
+
+/**
+ * This strategy uses S3-managed keys to perform server-side encryption.  Use 
this strategy when you want the server to
+ * perform the encryption (meaning you pay the cost of processing) and you 
want AWS to completely manage the key.
+ *
+ *
+ * See 
https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
+ *
+ */
+public class ServerSideS3EncryptionStrategy implements S3EncryptionStrategy {
+    @Override
+    public void configurePutObjectRequest(PutObjectRequest request, 
ObjectMetadata objectMetadata, String keyValue) {
+        
objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java
new file mode 100644
index 0000000..21d8d9e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService;
+import org.apache.nifi.processors.aws.s3.AbstractS3Processor;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+@Tags({"service", "encryption", "encrypt", "decryption", "decrypt", "key"})
+@CapabilityDescription("Adds configurable encryption to S3 Put and S3 Fetch 
operations.")
+public class StandardS3EncryptionService extends AbstractControllerService 
implements AmazonS3EncryptionService {
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardS3EncryptionService.class);
+
+    public static final String STRATEGY_NAME_NONE = "NONE";
+    public static final String STRATEGY_NAME_SSE_S3 = "SSE_S3";
+    public static final String STRATEGY_NAME_SSE_KMS = "SSE_KMS";
+    public static final String STRATEGY_NAME_SSE_C = "SSE_C";
+    public static final String STRATEGY_NAME_CSE_KMS = "CSE_KMS";
+    public static final String STRATEGY_NAME_CSE_CMK = "CSE_CMK";
+
+    private static final Map<String, S3EncryptionStrategy> namedStrategies = 
new HashMap<String, S3EncryptionStrategy>() {{
+        put(STRATEGY_NAME_NONE, new NoOpEncryptionStrategy());
+        put(STRATEGY_NAME_SSE_S3, new ServerSideS3EncryptionStrategy());
+        put(STRATEGY_NAME_SSE_KMS, new ServerSideKMSEncryptionStrategy());
+        put(STRATEGY_NAME_SSE_C, new ServerSideCEKEncryptionStrategy());
+        put(STRATEGY_NAME_CSE_KMS, new ClientSideKMSEncryptionStrategy());
+        put(STRATEGY_NAME_CSE_CMK, new ClientSideCMKEncryptionStrategy());
+    }};
+
+    private static final AllowableValue NONE = new 
AllowableValue(STRATEGY_NAME_NONE, "None","No encryption.");
+    private static final AllowableValue SSE_S3 = new 
AllowableValue(STRATEGY_NAME_SSE_S3, "Server-side S3","Use server-side, 
S3-managed encryption.");
+    private static final AllowableValue SSE_KMS = new 
AllowableValue(STRATEGY_NAME_SSE_KMS, "Server-side KMS","Use server-side, KMS 
key to perform encryption.");
+    private static final AllowableValue SSE_C = new 
AllowableValue(STRATEGY_NAME_SSE_C, "Server-side Customer Key","Use 
server-side, customer-supplied key for encryption.");
+    private static final AllowableValue CSE_KMS = new 
AllowableValue(STRATEGY_NAME_CSE_KMS, "Client-side KMS","Use client-side, KMS 
key to perform encryption.");
+    private static final AllowableValue CSE_CMK = new 
AllowableValue(STRATEGY_NAME_CSE_CMK, "Client-side Customer Master Key","Use 
client-side, customer-supplied master key to perform encryption.");
+
+    public static final PropertyDescriptor ENCRYPTION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("encryption-strategy")
+            .displayName("Encryption Strategy")
+            .description("Strategy to use for S3 data encryption and 
decryption.")
+            .allowableValues(NONE, SSE_S3, SSE_KMS, SSE_C, CSE_KMS, CSE_CMK)
+            .required(true)
+            .defaultValue(NONE.getValue())
+            .build();
+
+    public static final PropertyDescriptor ENCRYPTION_VALUE = new 
PropertyDescriptor.Builder()
+            .name("key-id-or-key-material")
+            .displayName("Key ID or Key Material")
+            .description("For Server-side CEK and Client-side CMK, this is 
base64-encoded Key Material.  For all others (except 'None'), it is the KMS Key 
ID.")
+            .required(false)
+            .sensitive(true)
+            .addValidator(new StandardValidators.StringLengthValidator(0, 
4096))
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor REGION = new 
PropertyDescriptor.Builder()
+            .name("region")
+            .required(false)
+            .allowableValues(AbstractS3Processor.getAvailableRegions())
+            
.defaultValue(AbstractS3Processor.createAllowableValue(Regions.DEFAULT_REGION).getValue())
+            .build();
+
+    private String keyValue = "";
+    private String region = "";
+    private S3EncryptionStrategy encryptionStrategy = new 
NoOpEncryptionStrategy();
+    private String strategyName = STRATEGY_NAME_NONE;
+
+    @OnEnabled
+    public void onConfigured(final ConfigurationContext context) throws 
InitializationException {
+        final String newStrategyName = 
context.getProperty(ENCRYPTION_STRATEGY).getValue();
+        final String newKeyValue = 
context.getProperty(ENCRYPTION_VALUE).getValue();
+        final S3EncryptionStrategy newEncryptionStrategy = 
namedStrategies.get(newStrategyName);
+        String newRegion = null;
+
+        if (context.getProperty(REGION) != null ) {
+            newRegion = context.getProperty(REGION).getValue();
+        }
+
+        if (newEncryptionStrategy == null) {
+            final String msg = "No encryption strategy found for name: " + 
strategyName;
+            logger.warn(msg);
+            throw new InitializationException(msg);
+        }
+
+        strategyName = newStrategyName;
+        encryptionStrategy = newEncryptionStrategy;
+        keyValue = newKeyValue;
+        region = newRegion;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        Collection<ValidationResult> validationResults = new ArrayList<>();
+        
validationResults.add(encryptionStrategy.validateKey(validationContext.getProperty(ENCRYPTION_VALUE).getValue()));
+        return validationResults;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(ENCRYPTION_STRATEGY);
+        properties.add(ENCRYPTION_VALUE);
+        properties.add(REGION);
+        return Collections.unmodifiableList(properties);
+    }
+
+    @Override
+    public void configurePutObjectRequest(PutObjectRequest request, 
ObjectMetadata objectMetadata) {
+        encryptionStrategy.configurePutObjectRequest(request, objectMetadata, 
keyValue);
+    }
+
+    @Override
+    public void 
configureInitiateMultipartUploadRequest(InitiateMultipartUploadRequest request, 
ObjectMetadata objectMetadata) {
+        encryptionStrategy.configureInitiateMultipartUploadRequest(request, 
objectMetadata, keyValue);
+    }
+
+    @Override
+    public void configureGetObjectRequest(GetObjectRequest request, 
ObjectMetadata objectMetadata) {
+        encryptionStrategy.configureGetObjectRequest(request, objectMetadata, 
keyValue);
+    }
+
+    @Override
+    public void configureUploadPartRequest(UploadPartRequest request, 
ObjectMetadata objectMetadata) {
+        encryptionStrategy.configureUploadPartRequest(request, objectMetadata, 
keyValue);
+    }
+
+    @Override
+    public AmazonS3Client createEncryptionClient(AWSCredentialsProvider 
credentialsProvider, ClientConfiguration clientConfiguration) {
+        return encryptionStrategy.createEncryptionClient(credentialsProvider, 
clientConfiguration, region, keyValue);
+    }
+
+    @Override
+    public String getRegion() {
+        return region;
+    }
+
+    @Override
+    public String getStrategyName() {
+        return strategyName;
+    }
+}
+
+
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 5e2dea4..390d4c9 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,4 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService
\ No newline at end of file
+org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService
+
+org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.S3EncryptionService/additionalDetails.html
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.S3EncryptionService/additionalDetails.html
new file mode 100644
index 0000000..108a0b7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.S3EncryptionService/additionalDetails.html
@@ -0,0 +1,71 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>S3EncryptionService</title>
+
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css" />
+</head>
+<body>
+
+<div>
+    The <code>S3EncryptionService</code> manages an encryption strategy and 
applies that strategy to various S3 operations.
+
+    <br>
+
+    <b>Note:</b> this service has no effect when a processor has the 
<code>SERVER_SIDE_ENCRYPTION</code> property set.  To use
+    this service with processors so configured, first create a service 
instance, set the <code>Encryption Strategy</code> to <code>Server-side 
S3</code>,
+    disable the <code>SERVER_SIDE_ENCRYPTION</code> processor setting, and 
finally, associate the processor with the service.
+</div>
+
+
+<h2>Configuration Details</h2>
+<h3>Encryption Strategy</h3>
+
+<div>
+    The name of the specific encryption strategy for this service to use when 
encrypting and decrypting S3 operations.
+
+    <ul>
+        <li><code>None</code> - no encryption is configured or applied.</li>
+        <li><code>Server-side S3</code> - encryption and decryption is managed 
by S3; no keys are required.</li>
+        <li><code>Server-side KMS</code> - encryption and decryption are 
performed by S3 using the configured KMS key.</li>
+        <li><code>Server-side Customer Key</code> - encryption and decryption 
are performed by S3 using the supplied customer key.</li>
+        <li><code>Client-side KMS</code> - like the server-side KMS strategy, 
with the encryption and decryption performed by the client.</li>
+        <li><code>Client-side Customer Master Key</code> - like the 
server-side CEK strategy, with the encryption and decryption performed by the 
client.</li>
+    </ul>
+</div>
+
+<h3>Key ID or Key Material</h3>
+<p>
+    When configured for either the server-side or client-side KMS strategy, 
this field should contain the ID or alias
+    of that key.
+</p>
+<p>
+    When configured for either the server-side or client-side customer key 
strategies, this field should contain the key
+    material, and that material must be base64 encoded.
+</p>
+<p>
+    All other encryption strategies ignore this field.
+</p>
+
+<h3>Region</h3>
+<div>
+    KMS key region, if any.  This value must match the actual region of the 
KMS key if supplied.
+</div>
+
+</body>
+</html>
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
index 00f8567..0860085 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
@@ -17,6 +17,15 @@
 package org.apache.nifi.processors.aws.s3;
 
 import com.amazonaws.auth.PropertiesCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kms.AWSKMS;
+import com.amazonaws.services.kms.AWSKMSClient;
+import com.amazonaws.services.kms.model.CreateKeyRequest;
+import com.amazonaws.services.kms.model.CreateKeyResult;
+import com.amazonaws.services.kms.model.GenerateDataKeyRequest;
+import com.amazonaws.services.kms.model.GenerateDataKeyResult;
+import com.amazonaws.services.kms.model.ScheduleKeyDeletionRequest;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CreateBucketRequest;
@@ -67,6 +76,7 @@ public abstract class AbstractS3IT {
 
     // Static so multiple Tests can use same client
     protected static AmazonS3Client client;
+    protected static AWSKMS kmsClient;
 
     @BeforeClass
     public static void oneTimeSetup() {
@@ -82,6 +92,8 @@ public abstract class AbstractS3IT {
         try {
             final PropertiesCredentials credentials = new 
PropertiesCredentials(fis);
             client = new AmazonS3Client(credentials);
+            kmsClient = new AWSKMSClient(credentials);
+            kmsClient.setRegion(Region.getRegion(Regions.fromName(REGION)));
 
             if (client.doesBucketExist(BUCKET_NAME)) {
                 fail("Bucket " + BUCKET_NAME + " exists. Choose a different 
bucket name to continue test");
@@ -185,4 +197,19 @@ public abstract class AbstractS3IT {
 
         return new File(uri);
     }
+
+    protected static String getKMSKey() {
+        CreateKeyRequest cmkRequest = new 
CreateKeyRequest().withDescription("CMK for unit tests");
+        CreateKeyResult cmkResult = kmsClient.createKey(cmkRequest);
+
+        GenerateDataKeyRequest dekRequest = new 
GenerateDataKeyRequest().withKeyId(cmkResult.getKeyMetadata().getKeyId()).withKeySpec("AES_128");
+        GenerateDataKeyResult dekResult = 
kmsClient.generateDataKey(dekRequest);
+
+        return dekResult.getKeyId();
+    }
+
+    protected static void deleteKMSKey(String keyId) {
+        ScheduleKeyDeletionRequest req = new 
ScheduleKeyDeletionRequest().withKeyId(keyId).withPendingWindowInDays(7);
+        kmsClient.scheduleKeyDeletion(req);
+    }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index 1bbfa28..7a2cf3c 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -21,6 +21,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -33,20 +34,28 @@ import 
com.amazonaws.services.s3.model.GetObjectTaggingResult;
 import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.Tag;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
 import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
+import 
org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockPropertyValue;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -58,11 +67,17 @@ import 
com.amazonaws.services.s3.model.MultipartUploadListing;
 import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.Region;
 import com.amazonaws.services.s3.model.StorageClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Provides integration level testing with actual AWS S3 resources for {@link 
PutS3Object} and requires additional configuration and resources to work.
  */
 public class ITPutS3Object extends AbstractS3IT {
+    private static final Logger logger = 
LoggerFactory.getLogger(ITPutS3Object.class);
 
     final static String TEST_ENDPOINT = "https://endpoint.com";;
     //    final static String TEST_TRANSIT_URI = "https://"; + BUCKET_NAME + 
".endpoint.com";
@@ -74,6 +89,28 @@ public class ITPutS3Object extends AbstractS3IT {
 
     final static Pattern reS3ETag = 
Pattern.compile("[0-9a-fA-f]{32,32}(-[0-9]+)?");
 
+
+    private static String kmsKeyId = "";
+    private static String randomKeyMaterial = "";
+
+
+    @BeforeClass
+    public static void setupClass() {
+        byte[] keyRawBytes = new byte[32];
+        SecureRandom secureRandom = new SecureRandom();
+        secureRandom.nextBytes(keyRawBytes);
+
+        randomKeyMaterial = Base64.encodeBase64String(keyRawBytes);
+        kmsKeyId = getKMSKey();
+    }
+
+    @AfterClass
+    public static void teardownClass() {
+        if (StringUtils.isNotEmpty(kmsKeyId)) {
+            deleteKMSKey(kmsKeyId);
+        }
+    }
+
     @Test
     public void testSimplePut() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
@@ -185,7 +222,6 @@ public class ITPutS3Object extends AbstractS3IT {
 
     }
 
-
     @Test
     public void testPutThenFetchWithoutSSE() throws IOException {
         testPutThenFetch(PutS3Object.NO_SERVER_SIDE_ENCRYPTION);
@@ -827,7 +863,6 @@ public class ITPutS3Object extends AbstractS3IT {
         Assert.assertTrue(ff1.getSize() > S3_MAXIMUM_OBJECT_SIZE);
     }
 
-    @Ignore
     @Test
     public void testS3MultipartAgeoff() throws InterruptedException, 
IOException {
         final PutS3Object processor = new PutS3Object();
@@ -910,6 +945,165 @@ public class ITPutS3Object extends AbstractS3IT {
         Assert.assertEquals("true", objectTags.get(0).getValue());
     }
 
+    @Test
+    public void 
testEncryptionServiceWithServerSideS3ManagedEncryptionStrategy() throws 
IOException, InitializationException {
+        TestRunner runner = 
createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_S3, 
"");
+
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "test.txt");
+        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        runner.assertValid();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+        Assert.assertEquals(1, flowFiles.size());
+        Assert.assertEquals(0, 
runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
+        MockFlowFile putSuccess = flowFiles.get(0);
+        
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY),
 StandardS3EncryptionService.STRATEGY_NAME_SSE_S3);
+
+        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, 
StandardS3EncryptionService.STRATEGY_NAME_SSE_S3, "");
+        
flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+        flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, 
ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
+        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, 
StandardS3EncryptionService.STRATEGY_NAME_SSE_S3);
+    }
+
+    @Test
+    public void testEncryptionServiceWithServerSideKMSEncryptionStrategy() 
throws IOException, InitializationException {
+        TestRunner runner = 
createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS,
 kmsKeyId);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "test.txt");
+        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        runner.assertValid();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+        Assert.assertEquals(1, flowFiles.size());
+        Assert.assertEquals(0, 
runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
+        MockFlowFile putSuccess = flowFiles.get(0);
+        
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY),
 StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS);
+
+        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, 
StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId);
+        
flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+        flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, 
"aws:kms");
+        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, 
StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS);
+    }
+
+    @Test
+    public void testEncryptionServiceWithServerSideCPEKEncryptionStrategy() 
throws IOException, InitializationException {
+        TestRunner runner = 
createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_C, 
randomKeyMaterial);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "test.txt");
+        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        runner.assertValid();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+        Assert.assertEquals(1, flowFiles.size());
+        Assert.assertEquals(0, 
runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
+        MockFlowFile putSuccess = flowFiles.get(0);
+        
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY),
 StandardS3EncryptionService.STRATEGY_NAME_SSE_C);
+
+        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, 
StandardS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial);
+        
flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+        // successful fetch does not indicate type of original encryption:
+        flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, null);
+        // but it does indicate it via our specific attribute:
+        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, 
StandardS3EncryptionService.STRATEGY_NAME_SSE_C);
+    }
+
+    @Test
+    public void testEncryptionServiceWithClientSideKMSEncryptionStrategy() 
throws InitializationException, IOException {
+        TestRunner runner = 
createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS,
 kmsKeyId);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "test.txt");
+        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        runner.assertValid();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+        Assert.assertEquals(1, flowFiles.size());
+        Assert.assertEquals(0, 
runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
+        MockFlowFile putSuccess = flowFiles.get(0);
+        
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY),
 StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS);
+
+        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, 
StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId);
+        
flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+        flowFile.assertAttributeEquals("x-amz-wrap-alg", "kms");
+        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, 
StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS);
+    }
+
+    @Test
+    public void testEncryptionServiceWithClientSideCMKEncryptionStrategy() 
throws InitializationException, IOException {
+        TestRunner runner = 
createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK,
 randomKeyMaterial);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "test.txt");
+        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        runner.assertValid();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+        Assert.assertEquals(1, flowFiles.size());
+        Assert.assertEquals(0, 
runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size());
+        MockFlowFile putSuccess = flowFiles.get(0);
+        
Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY),
 StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK);
+
+        MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, 
StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK, randomKeyMaterial);
+        flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, 
StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK);
+        
flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+
+        flowFile.assertAttributeExists("x-amz-key");
+        flowFile.assertAttributeNotEquals("x-amz-key", "");
+
+        flowFile.assertAttributeExists("x-amz-iv");
+        flowFile.assertAttributeNotEquals("x-amz-iv", "");
+    }
+
+    private static TestRunner createPutEncryptionTestRunner(String 
strategyName, String keyIdOrMaterial) throws InitializationException {
+        return createEncryptionTestRunner(new PutS3Object(), strategyName, 
keyIdOrMaterial);
+    }
+
+    private static MockFlowFile fetchEncryptedFlowFile(Map<String, String> 
attributes, String strategyName, String keyIdOrMaterial) throws 
InitializationException {
+        final TestRunner runner = createEncryptionTestRunner(new 
FetchS3Object(), strategyName, keyIdOrMaterial);
+        runner.enqueue(new byte[0], attributes);
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
+        return flowFiles.get(0);
+    }
+
+    private static TestRunner createEncryptionTestRunner(Processor processor, 
String strategyName, String keyIdOrMaterial) throws InitializationException {
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        final StandardS3EncryptionService service = new 
StandardS3EncryptionService();
+        final ConfigurationContext context = mock(ConfigurationContext.class);
+
+        runner.addControllerService(PutS3Object.ENCRYPTION_SERVICE.getName(), 
service);
+        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(PutS3Object.REGION, REGION);
+        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+        runner.setProperty(PutS3Object.ENCRYPTION_SERVICE, 
service.getIdentifier());
+        runner.setProperty(service, 
StandardS3EncryptionService.ENCRYPTION_STRATEGY, strategyName);
+        runner.setProperty(service, 
StandardS3EncryptionService.ENCRYPTION_VALUE, keyIdOrMaterial);
+        runner.setProperty(service, StandardS3EncryptionService.REGION, 
REGION);
+
+        
when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_STRATEGY)).thenReturn(new
 MockPropertyValue(strategyName));
+        
when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_VALUE)).thenReturn(new
 MockPropertyValue(keyIdOrMaterial));
+        
when(context.getProperty(StandardS3EncryptionService.REGION)).thenReturn(new 
MockPropertyValue(REGION));
+
+        service.onConfigured(context);
+        runner.enableControllerService(service);
+
+        return runner;
+    }
+
     private class MockAmazonS3Client extends AmazonS3Client {
         MultipartUploadListing listing;
         public void setListing(MultipartUploadListing newlisting) {
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
index 4326ed7..88ccfb0 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -24,7 +24,6 @@ import java.util.Map;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.proxy.ProxyConfigurationService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -252,7 +251,8 @@ public class TestFetchS3Object {
         assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE));
         assertTrue(pd.contains(FetchS3Object.TIMEOUT));
         assertTrue(pd.contains(FetchS3Object.VERSION_ID));
-        
assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
+        assertTrue(pd.contains(FetchS3Object.ENCRYPTION_SERVICE));
+        assertTrue(pd.contains(FetchS3Object.PROXY_CONFIGURATION_SERVICE));
         assertTrue(pd.contains(FetchS3Object.PROXY_HOST));
         assertTrue(pd.contains(FetchS3Object.PROXY_HOST_PORT));
         assertTrue(pd.contains(FetchS3Object.PROXY_USERNAME));
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
index 6bb750f..6ba6de8 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
@@ -29,7 +29,6 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.proxy.ProxyConfigurationService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -210,7 +209,7 @@ public class TestPutS3Object {
     public void testGetPropertyDescriptors() {
         PutS3Object processor = new PutS3Object();
         List<PropertyDescriptor> pd = 
processor.getSupportedPropertyDescriptors();
-        assertEquals("size should be eq", 33, pd.size());
+        assertEquals("size should be eq", 34, pd.size());
         assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
         assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
         assertTrue(pd.contains(PutS3Object.BUCKET));
@@ -232,7 +231,8 @@ public class TestPutS3Object {
         assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST));
         assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
         assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION));
-        
assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
+        assertTrue(pd.contains(PutS3Object.ENCRYPTION_SERVICE));
+        assertTrue(pd.contains(PutS3Object.PROXY_CONFIGURATION_SERVICE));
         assertTrue(pd.contains(PutS3Object.PROXY_HOST));
         assertTrue(pd.contains(PutS3Object.PROXY_HOST_PORT));
         assertTrue(pd.contains(PutS3Object.PROXY_USERNAME));
@@ -245,5 +245,4 @@ public class TestPutS3Object {
         assertTrue(pd.contains(PutS3Object.MULTIPART_S3_AGEOFF_INTERVAL));
         assertTrue(pd.contains(PutS3Object.MULTIPART_S3_MAX_AGE));
     }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java
new file mode 100644
index 0000000..3261288
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.commons.codec.binary.Base64;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.SecureRandom;
+
+
+public class TestS3EncryptionStrategies {
+
+    private String randomKeyMaterial = "";
+    private String randomKeyId = "mock-key-id";
+    private String region = "us-west-1";
+
+    private ObjectMetadata metadata = null;
+    private PutObjectRequest putObjectRequest = null;
+    private InitiateMultipartUploadRequest initUploadRequest = null;
+    private GetObjectRequest getObjectRequest = null;
+    private UploadPartRequest uploadPartRequest = null;
+
+    @Before
+    public void setup() {
+        byte[] keyRawBytes = new byte[32];
+        SecureRandom secureRandom = new SecureRandom();
+        secureRandom.nextBytes(keyRawBytes);
+        randomKeyMaterial = Base64.encodeBase64String(keyRawBytes);
+
+        metadata = new ObjectMetadata();
+        putObjectRequest = new PutObjectRequest("", "", "");
+        initUploadRequest = new InitiateMultipartUploadRequest("", "");
+        getObjectRequest = new GetObjectRequest("", "");
+        uploadPartRequest = new UploadPartRequest();
+    }
+
+    @Test
+    public void testClientSideKMSEncryptionStrategy() {
+        S3EncryptionStrategy strategy = new ClientSideKMSEncryptionStrategy();
+
+        // This shows that the strategy builds a client:
+        Assert.assertNotNull(strategy.createEncryptionClient(null, null, 
region, randomKeyMaterial));
+
+        // This shows that the strategy does not modify the metadata or any of 
the requests:
+        Assert.assertNull(metadata.getSSEAlgorithm());
+        Assert.assertNull(putObjectRequest.getSSEAwsKeyManagementParams());
+        Assert.assertNull(putObjectRequest.getSSECustomerKey());
+
+        Assert.assertNull(initUploadRequest.getSSEAwsKeyManagementParams());
+        Assert.assertNull(initUploadRequest.getSSECustomerKey());
+
+        Assert.assertNull(getObjectRequest.getSSECustomerKey());
+
+        Assert.assertNull(uploadPartRequest.getSSECustomerKey());
+    }
+
+    @Test
+    public void testClientSideCMKEncryptionStrategy() {
+        S3EncryptionStrategy strategy = new ClientSideCMKEncryptionStrategy();
+
+        // This shows that the strategy builds a client:
+        Assert.assertNotNull(strategy.createEncryptionClient(null, null, 
region, randomKeyMaterial));
+
+        // This shows that the strategy does not modify the metadata or any of 
the requests:
+        Assert.assertNull(metadata.getSSEAlgorithm());
+        Assert.assertNull(putObjectRequest.getSSEAwsKeyManagementParams());
+        Assert.assertNull(putObjectRequest.getSSECustomerKey());
+
+        Assert.assertNull(initUploadRequest.getSSEAwsKeyManagementParams());
+        Assert.assertNull(initUploadRequest.getSSECustomerKey());
+
+        Assert.assertNull(getObjectRequest.getSSECustomerKey());
+
+        Assert.assertNull(uploadPartRequest.getSSECustomerKey());
+    }
+
+    @Test
+    public void testServerSideCEKEncryptionStrategy() {
+        S3EncryptionStrategy strategy = new ServerSideCEKEncryptionStrategy();
+
+        // This shows that the strategy does *not* build a client:
+        Assert.assertNull(strategy.createEncryptionClient(null, null, "", ""));
+
+        // This shows that the strategy sets the SSE customer key as expected:
+        strategy.configurePutObjectRequest(putObjectRequest, metadata, 
randomKeyMaterial);
+        Assert.assertEquals(randomKeyMaterial, 
putObjectRequest.getSSECustomerKey().getKey());
+        Assert.assertNull(putObjectRequest.getSSEAwsKeyManagementParams());
+        Assert.assertNull(metadata.getSSEAlgorithm());
+
+        // Same for InitiateMultipartUploadRequest:
+        strategy.configureInitiateMultipartUploadRequest(initUploadRequest, 
metadata, randomKeyMaterial);
+        Assert.assertEquals(randomKeyMaterial, 
initUploadRequest.getSSECustomerKey().getKey());
+        Assert.assertNull(initUploadRequest.getSSEAwsKeyManagementParams());
+        Assert.assertNull(metadata.getSSEAlgorithm());
+
+        // Same for GetObjectRequest:
+        strategy.configureGetObjectRequest(getObjectRequest, metadata, 
randomKeyMaterial);
+        Assert.assertEquals(randomKeyMaterial, 
initUploadRequest.getSSECustomerKey().getKey());
+        Assert.assertNull(metadata.getSSEAlgorithm());
+
+        // Same for UploadPartRequest:
+        strategy.configureUploadPartRequest(uploadPartRequest, metadata, 
randomKeyMaterial);
+        Assert.assertEquals(randomKeyMaterial, 
uploadPartRequest.getSSECustomerKey().getKey());
+        Assert.assertNull(metadata.getSSEAlgorithm());
+    }
+
+    @Test
+    public void testServerSideKMSEncryptionStrategy() {
+        S3EncryptionStrategy strategy = new ServerSideKMSEncryptionStrategy();
+
+        // This shows that the strategy does *not* build a client:
+        Assert.assertNull(strategy.createEncryptionClient(null, null, "", ""));
+
+        // This shows that the strategy sets the SSE KMS key id as expected:
+        strategy.configurePutObjectRequest(putObjectRequest, metadata, 
randomKeyId);
+        Assert.assertEquals(randomKeyId, 
putObjectRequest.getSSEAwsKeyManagementParams().getAwsKmsKeyId());
+        Assert.assertNull(putObjectRequest.getSSECustomerKey());
+        Assert.assertNull(metadata.getSSEAlgorithm());
+
+        // Same for InitiateMultipartUploadRequest:
+        strategy.configureInitiateMultipartUploadRequest(initUploadRequest, 
metadata, randomKeyId);
+        Assert.assertEquals(randomKeyId, 
initUploadRequest.getSSEAwsKeyManagementParams().getAwsKmsKeyId());
+        Assert.assertNull(initUploadRequest.getSSECustomerKey());
+        Assert.assertNull(metadata.getSSEAlgorithm());
+    }
+
+    @Test
+    public void testServerSideS3EncryptionStrategy() {
+        S3EncryptionStrategy strategy = new ServerSideS3EncryptionStrategy();
+
+        // This shows that the strategy does *not* build a client:
+        Assert.assertNull(strategy.createEncryptionClient(null, null, "", ""));
+
+        // This shows that the strategy sets the SSE algorithm field as 
expected:
+        strategy.configurePutObjectRequest(putObjectRequest, metadata, "");
+        Assert.assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, 
metadata.getSSEAlgorithm());
+    }
+
+    @Test
+    public void testNoOpEncryptionStrategy() {
+        S3EncryptionStrategy strategy = new NoOpEncryptionStrategy();
+
+        // This shows that the strategy does *not* build a client:
+        Assert.assertNull(strategy.createEncryptionClient(null, null, "", ""));
+
+        // This shows the request and metadata start with various null objects:
+        Assert.assertNull(metadata.getSSEAlgorithm());
+        Assert.assertNull(putObjectRequest.getSSEAwsKeyManagementParams());
+        Assert.assertNull(putObjectRequest.getSSECustomerKey());
+
+        // Act:
+        strategy.configurePutObjectRequest(putObjectRequest, metadata, "");
+
+        // This shows that the request and metadata were not changed:
+        Assert.assertNull(metadata.getSSEAlgorithm());
+        Assert.assertNull(putObjectRequest.getSSEAwsKeyManagementParams());
+        Assert.assertNull(putObjectRequest.getSSECustomerKey());
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java
new file mode 100644
index 0000000..d040d34
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3.encryption;
+
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import java.util.List;
+
+
+public class TestStandardS3EncryptionService {
+    private StandardS3EncryptionService service;
+    private ConfigurationContext context;
+    private String strategyName;
+    private String keyIdOrMaterial;
+    private String region;
+
+    @Before
+    public void setup() throws InitializationException {
+        service = new StandardS3EncryptionService();
+        context = Mockito.mock(ConfigurationContext.class);
+
+        strategyName = StandardS3EncryptionService.STRATEGY_NAME_NONE;
+        keyIdOrMaterial = "test-key-id";
+        region = "us-west-1";
+
+        
Mockito.when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_STRATEGY)).thenReturn(new
 MockPropertyValue(strategyName));
+        
Mockito.when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_VALUE)).thenReturn(new
 MockPropertyValue(keyIdOrMaterial));
+        
Mockito.when(context.getProperty(StandardS3EncryptionService.REGION)).thenReturn(new
 MockPropertyValue(region));
+        service.onConfigured(context);
+    }
+
+    @Test
+    public void testServiceProperties() {
+        Assert.assertEquals(service.getRegion(), region);
+        Assert.assertEquals(service.getStrategyName(), strategyName);
+    }
+
+    @Test
+    public void testCreateClientReturnsNull() {
+        Assert.assertNull(service.createEncryptionClient(null, null));
+    }
+
+    @Test
+    public void testRequests() {
+        final ObjectMetadata metadata = new ObjectMetadata();
+        final GetObjectRequest getObjectRequest = new GetObjectRequest("", "");
+        final InitiateMultipartUploadRequest initUploadRequest = new 
InitiateMultipartUploadRequest("", "");
+        final PutObjectRequest putObjectRequest = new PutObjectRequest("", "", 
"");
+        final UploadPartRequest uploadPartRequest = new UploadPartRequest();
+
+        service.configureGetObjectRequest(getObjectRequest, metadata);
+        Assert.assertNull(getObjectRequest.getSSECustomerKey());
+        Assert.assertNull(metadata.getSSEAlgorithm());
+
+        service.configureUploadPartRequest(uploadPartRequest, metadata);
+        Assert.assertNull(uploadPartRequest.getSSECustomerKey());
+        Assert.assertNull(metadata.getSSEAlgorithm());
+
+        service.configurePutObjectRequest(putObjectRequest, metadata);
+        Assert.assertNull(putObjectRequest.getSSECustomerKey());
+        Assert.assertNull(metadata.getSSEAlgorithm());
+
+        service.configureInitiateMultipartUploadRequest(initUploadRequest, 
metadata);
+        Assert.assertNull(initUploadRequest.getSSECustomerKey());
+        Assert.assertNull(metadata.getSSEAlgorithm());
+    }
+
+    @Test
+    public void testProperties() {
+        List<PropertyDescriptor> properties = 
service.getSupportedPropertyDescriptors();
+        Assert.assertEquals(3, properties.size());
+
+        Assert.assertEquals(properties.get(0).getName(), 
StandardS3EncryptionService.ENCRYPTION_STRATEGY.getName());
+        Assert.assertEquals(properties.get(1).getName(), 
StandardS3EncryptionService.ENCRYPTION_VALUE.getName());
+        Assert.assertEquals(properties.get(2).getName(), 
StandardS3EncryptionService.REGION.getName());
+    }
+}

Reply via email to