This is an automated email from the ASF dual-hosted git repository. alopresto pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new e841f4d NIFI-4256 Adds AWS Encryption Controller Service. NIFI-4256 Adds AWS S3 FlowFile encryption attributes, more javadocs, better names. e841f4d is described below commit e841f4d5b7da7cf88cbb1805d351bf75ca8a3280 Author: Troy Melhase <t...@troy.io> AuthorDate: Sat Jul 6 11:59:44 2019 -0800 NIFI-4256 Adds AWS Encryption Controller Service. NIFI-4256 Adds AWS S3 FlowFile encryption attributes, more javadocs, better names. This closes #3574. Signed-off-by: Andy LoPresto <alopre...@apache.org> --- .../nifi/processors/aws/AbstractAWSProcessor.java | 4 +- .../processors/aws/s3/AbstractS3Processor.java | 31 ++-- .../aws/s3/AmazonS3EncryptionService.java | 80 +++++++++ .../nifi-aws-bundle/nifi-aws-processors/pom.xml | 5 + .../nifi/processors/aws/s3/FetchS3Object.java | 14 +- .../apache/nifi/processors/aws/s3/PutS3Object.java | 37 +++- .../ClientSideCMKEncryptionStrategy.java | 95 ++++++++++ .../ClientSideKMSEncryptionStrategy.java | 65 +++++++ .../aws/s3/encryption/NoOpEncryptionStrategy.java | 20 +++ .../aws/s3/encryption/S3EncryptionStrategy.java | 92 ++++++++++ .../ServerSideCEKEncryptionStrategy.java | 75 ++++++++ .../ServerSideKMSEncryptionStrategy.java | 43 +++++ .../encryption/ServerSideS3EncryptionStrategy.java | 36 ++++ .../s3/encryption/StandardS3EncryptionService.java | 189 ++++++++++++++++++++ .../org.apache.nifi.controller.ControllerService | 4 +- .../additionalDetails.html | 71 ++++++++ .../nifi/processors/aws/s3/AbstractS3IT.java | 27 +++ .../nifi/processors/aws/s3/ITPutS3Object.java | 198 ++++++++++++++++++++- .../nifi/processors/aws/s3/TestFetchS3Object.java | 4 +- .../nifi/processors/aws/s3/TestPutS3Object.java | 7 +- .../s3/encryption/TestS3EncryptionStrategies.java | 180 +++++++++++++++++++ .../TestStandardS3EncryptionService.java | 102 +++++++++++ 22 files changed, 1347 insertions(+), 32 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java index 06f4a16..10d19ff 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java @@ -157,11 +157,11 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH}; public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS); - protected static AllowableValue createAllowableValue(final Regions region) { + public static AllowableValue createAllowableValue(final Regions region) { return new AllowableValue(region.getName(), region.getDescription(), "AWS Region Code : " + region.getName()); } - protected static AllowableValue[] getAvailableRegions() { + public static AllowableValue[] getAvailableRegions() { final List<AllowableValue> values = new ArrayList<>(); for (final Regions region : Regions.values()) { values.add(createAllowableValue(region)); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java index 5cc24e9..5e8ff32 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import com.amazonaws.auth.AWSStaticCredentialsProvider; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; @@ -126,19 +127,34 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider new AllowableValue("S3SignerType", "Signature v2")) .defaultValue("Default Signature") .build(); + public static final PropertyDescriptor ENCRYPTION_SERVICE = new PropertyDescriptor.Builder() + .name("encryption-service") + .displayName("Encryption Service") + .description("Specifies the Encryption Service Controller used configure requests. " + + "For backward compatibility, this value is ignored when 'Server Side Encryption' is set.") + .required(false) + .identifiesControllerService(AmazonS3EncryptionService.class) + .build(); + /** * Create client using credentials provider. This is the preferred way for creating clients */ @Override protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { getLogger().info("Creating client with credentials provider"); - initializeSignerOverride(context, config); + AmazonS3EncryptionService encryptionService = context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class); + AmazonS3Client s3 = null; - final AmazonS3Client s3 = new AmazonS3Client(credentialsProvider, config); + if (encryptionService != null) { + s3 = encryptionService.createEncryptionClient(credentialsProvider, config); + } - initalizeEndpointOverride(context, s3); + if (s3 == null) { + s3 = new AmazonS3Client(credentialsProvider, config); + } + initalizeEndpointOverride(context, s3); return s3; } @@ -167,14 +183,7 @@ public abstract class AbstractS3Processor extends AbstractAWSCredentialsProvider @Override protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { getLogger().info("Creating client with AWS credentials"); - - initializeSignerOverride(context, config); - - final AmazonS3Client s3 = new AmazonS3Client(credentials, config); - - initalizeEndpointOverride(context, s3); - - return s3; + return createClient(context, new AWSStaticCredentialsProvider(credentials), config); } protected Grantee createGrantee(final String value) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java new file mode 100644 index 0000000..946a1cc --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AmazonS3EncryptionService.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.UploadPartRequest; +import org.apache.nifi.controller.ControllerService; + +/** + * This interface defines how clients interact with an S3 encryption service. + */ +public interface AmazonS3EncryptionService extends ControllerService { + + /** + * Configure a {@link PutObjectRequest} for encryption. + * @param request the request to configure. + * @param objectMetadata the request metadata to configure. + */ + void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata); + + /** + * Configure an {@link InitiateMultipartUploadRequest} for encryption. + * @param request the request to configure. + * @param objectMetadata the request metadata to configure. + */ + void configureInitiateMultipartUploadRequest(InitiateMultipartUploadRequest request, ObjectMetadata objectMetadata); + + /** + * Configure a {@link GetObjectRequest} for encryption. + * @param request the request to configure. + * @param objectMetadata the request metadata to configure. + */ + void configureGetObjectRequest(GetObjectRequest request, ObjectMetadata objectMetadata); + + /** + * Configure an {@link UploadPartRequest} for encryption. + * @param request the request to configure. + * @param objectMetadata the request metadata to configure. + */ + void configureUploadPartRequest(UploadPartRequest request, ObjectMetadata objectMetadata); + + /** + * Create an S3 encryption client. + * + * @param credentialsProvider AWS credentials provider. + * @param clientConfiguration Client configuration. + * @return {@link AmazonS3Client}, perhaps an {@link com.amazonaws.services.s3.AmazonS3EncryptionClient} + */ + AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration); + + /** + * @return The region associated with the service, as a String. + */ + String getRegion(); + + /** + * @return The name of the encryption strategy associated with the service. + */ + String getStrategyName(); +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml index b6d7931..5e739e1 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml @@ -73,6 +73,11 @@ <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sts</artifactId> </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index b66468a..8dabeb9 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -64,7 +64,8 @@ import com.amazonaws.services.s3.model.S3Object; @WritesAttribute(attribute = "s3.expirationTime", description = "If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"), @WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"), @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"), - @WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"),}) + @WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"), + @WritesAttribute(attribute = "s3.encryptionStrategy", description = "The name of the encryption strategy, if any was set"),}) public class FetchS3Object extends AbstractS3Processor { public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder() @@ -89,8 +90,8 @@ public class FetchS3Object extends AbstractS3Processor { public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID, - SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, - REQUESTER_PAYS)); + SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, ENCRYPTION_SERVICE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, + PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, REQUESTER_PAYS)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -120,6 +121,13 @@ public class FetchS3Object extends AbstractS3Processor { request.setRequesterPays(requesterPays); final Map<String, String> attributes = new HashMap<>(); + + AmazonS3EncryptionService encryptionService = context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class); + if (encryptionService != null) { + encryptionService.configureGetObjectRequest(request, new ObjectMetadata()); + attributes.put("s3.encryptionStrategy", encryptionService.getStrategyName()); + } + try (final S3Object s3Object = client.getObject(request)) { flowFile = session.importFrom(s3Object.getObjectContent(), flowFile); attributes.put("s3.bucket", s3Object.getBucketName()); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 537d269..81c55b6 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -122,8 +122,8 @@ import com.amazonaws.services.s3.model.UploadPartResult; "the S3 object, if one is set"), @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server side encryption algorithm of the object"), @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of " + - "the S3 object, if any was set") -}) + "the S3 object, if any was set"), + @WritesAttribute(attribute = "s3.encryptionStrategy", description = "The name of the encryption strategy, if any was set"),}) public class PutS3Object extends AbstractS3Processor { public static final long MIN_S3_PART_SIZE = 50L * 1024L * 1024L; @@ -235,7 +235,8 @@ public class PutS3Object extends AbstractS3Processor { Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, REMOVE_TAG_PREFIX, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, MULTIPART_S3_AGEOFF_INTERVAL, - MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); + MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, + PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD)); final static String S3_BUCKET_KEY = "s3.bucket"; final static String S3_OBJECT_KEY = "s3.key"; @@ -251,6 +252,8 @@ public class PutS3Object extends AbstractS3Processor { final static String S3_API_METHOD_PUTOBJECT = "putobject"; final static String S3_API_METHOD_MULTIPARTUPLOAD = "multipartupload"; final static String S3_SSE_ALGORITHM = "s3.sseAlgorithm"; + final static String S3_ENCRYPTION_STRATEGY = "s3.encryptionStrategy"; + final static String S3_PROCESS_UNSCHEDULED_MESSAGE = "Processor unscheduled, stopping upload"; @@ -276,6 +279,7 @@ public class PutS3Object extends AbstractS3Processor { protected boolean localUploadExistsInS3(final AmazonS3Client s3, final String bucket, final MultipartState localState) { ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest(bucket); MultipartUploadListing listing = s3.listMultipartUploads(listRequest); + for (MultipartUpload upload : listing.getMultipartUploads()) { if (upload.getUploadId().equals(localState.getUploadId())) { return true; @@ -473,9 +477,13 @@ public class PutS3Object extends AbstractS3Processor { } final String serverSideEncryption = context.getProperty(SERVER_SIDE_ENCRYPTION).getValue(); + AmazonS3EncryptionService encryptionService = null; + if (!serverSideEncryption.equals(NO_SERVER_SIDE_ENCRYPTION)) { objectMetadata.setSSEAlgorithm(serverSideEncryption); attributes.put(S3_SSE_ALGORITHM, serverSideEncryption); + } else { + encryptionService = context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class); } if (!userMetadata.isEmpty()) { @@ -487,12 +495,17 @@ public class PutS3Object extends AbstractS3Processor { // single part upload //---------------------------------------- final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata); - request.setStorageClass( - StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue())); + if (encryptionService != null) { + encryptionService.configurePutObjectRequest(request, objectMetadata); + attributes.put(S3_ENCRYPTION_STRATEGY, encryptionService.getStrategyName()); + } + + request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue())); final AccessControlList acl = createACL(context, ff); if (acl != null) { request.setAccessControlList(acl); } + final CannedAccessControlList cannedAcl = createCannedACL(context, ff); if (cannedAcl != null) { request.withCannedAcl(cannedAcl); @@ -583,9 +596,13 @@ public class PutS3Object extends AbstractS3Processor { // initiate multipart upload or find position in file //------------------------------------------------------------ if (currentState.getUploadId().isEmpty()) { - final InitiateMultipartUploadRequest initiateRequest = - new InitiateMultipartUploadRequest(bucket, key, objectMetadata); + final InitiateMultipartUploadRequest initiateRequest = new InitiateMultipartUploadRequest(bucket, key, objectMetadata); + if (encryptionService != null) { + encryptionService.configureInitiateMultipartUploadRequest(initiateRequest, objectMetadata); + attributes.put(S3_ENCRYPTION_STRATEGY, encryptionService.getStrategyName()); + } initiateRequest.setStorageClass(currentState.getStorageClass()); + final AccessControlList acl = createACL(context, ff); if (acl != null) { initiateRequest.setAccessControlList(acl); @@ -662,6 +679,9 @@ public class PutS3Object extends AbstractS3Processor { .withInputStream(in) .withPartNumber(part) .withPartSize(thisPartSize); + if (encryptionService != null) { + encryptionService.configureUploadPartRequest(uploadRequest, objectMetadata); + } try { UploadPartResult uploadPartResult = s3.uploadPart(uploadRequest); currentState.addPartETag(uploadPartResult.getPartETag()); @@ -686,6 +706,8 @@ public class PutS3Object extends AbstractS3Processor { //------------------------------------------------------------ CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest( bucket, key, currentState.getUploadId(), currentState.getPartETags()); + + // No call to an encryption service is needed for a CompleteMultipartUploadRequest. try { CompleteMultipartUploadResult completeResult = s3.completeMultipartUpload(completeRequest); @@ -812,6 +834,7 @@ public class PutS3Object extends AbstractS3Processor { final String uploadId = upload.getUploadId(); final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest( bucket, uploadKey, uploadId); + // No call to an encryption service is necessary for an AbortMultipartUploadRequest. try { s3.abortMultipartUpload(abortRequest); getLogger().info("Aborting out of date multipart upload, bucket {} key {} ID {}, initiated {}", diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCMKEncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCMKEncryptionStrategy.java new file mode 100644 index 0000000..d157ea5 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideCMKEncryptionStrategy.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.AmazonS3EncryptionClient; +import com.amazonaws.services.s3.model.CryptoConfiguration; +import com.amazonaws.services.s3.model.EncryptionMaterials; +import com.amazonaws.services.s3.model.StaticEncryptionMaterialsProvider; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.ValidationResult; + +import javax.crypto.spec.SecretKeySpec; + +/** + * This strategy uses a client master key to perform client-side encryption. Use this strategy when you want the client to perform the encryption, + * (thus incurring the cost of processing) and when you want to manage the key material yourself. + * + * See https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingClientSideEncryption.html#client-side-encryption-client-side-master-key-intro + * + */ +public class ClientSideCMKEncryptionStrategy implements S3EncryptionStrategy { + /** + * Create an encryption client. + * + * @param credentialsProvider AWS credentials provider. + * @param clientConfiguration Client configuration + * @param region AWS region + * @param keyIdOrMaterial client master key, always base64 encoded + * @return AWS S3 client + */ + @Override + public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String region, String keyIdOrMaterial) throws SecurityException { + if (!validateKey(keyIdOrMaterial).isValid()) { + throw new SecurityException("Invalid client key; ensure key material is base64 encoded."); + } + + byte[] keyMaterial = Base64.decodeBase64(keyIdOrMaterial); + SecretKeySpec symmetricKey = new SecretKeySpec(keyMaterial, "AES"); + StaticEncryptionMaterialsProvider encryptionMaterialsProvider = new StaticEncryptionMaterialsProvider(new EncryptionMaterials(symmetricKey)); + boolean haveRegion = StringUtils.isNotBlank(region); + CryptoConfiguration cryptoConfig = new CryptoConfiguration(); + Region awsRegion = null; + + if (haveRegion) { + awsRegion = Region.getRegion(Regions.fromName(region)); + cryptoConfig.setAwsKmsRegion(awsRegion); + } + + AmazonS3EncryptionClient client = new AmazonS3EncryptionClient(credentialsProvider, encryptionMaterialsProvider, cryptoConfig); + if (haveRegion && awsRegion != null) { + client.setRegion(awsRegion); + } + + return client; + } + + public ValidationResult validateKey(String keyValue) { + if (StringUtils.isBlank(keyValue) || !Base64.isBase64(keyValue)) { + return new ValidationResult.Builder().valid(false).build(); + } + + boolean decoded = false; + boolean sized = false; + byte[] keyMaterial; + + try { + keyMaterial = Base64.decodeBase64(keyValue); + decoded = true; + sized = keyMaterial.length == 32 || keyMaterial.length == 24 || keyMaterial.length == 16; + } catch (final Exception ignored) { + } + + return new ValidationResult.Builder().valid(decoded && sized).build(); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java new file mode 100644 index 0000000..e6d75e4 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ClientSideKMSEncryptionStrategy.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.AmazonS3EncryptionClient; +import com.amazonaws.services.s3.model.CryptoConfiguration; +import com.amazonaws.services.s3.model.KMSEncryptionMaterialsProvider; +import org.apache.commons.lang3.StringUtils; + +/** + * This strategy uses KMS key id to perform client-side encryption. Use this strategy when you want the client to perform the encryption, + * (thus incurring the cost of processing) and manage the key in a KMS instance. + * + * See https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingClientSideEncryption.html#client-side-encryption-kms-managed-master-key-intro + * + */ +public class ClientSideKMSEncryptionStrategy implements S3EncryptionStrategy { + /** + * Create an encryption client. + * + * @param credentialsProvider AWS credentials provider. + * @param clientConfiguration Client configuration + * @param region AWS region + * @param keyIdOrMaterial KMS key id + * @return AWS S3 client + */ + @Override + public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String region, String keyIdOrMaterial) { + KMSEncryptionMaterialsProvider materialProvider = new KMSEncryptionMaterialsProvider(keyIdOrMaterial); + boolean haveRegion = StringUtils.isNotBlank(region); + Region awsRegion = null; + + CryptoConfiguration cryptoConfig = new CryptoConfiguration(); + if (haveRegion) { + awsRegion = Region.getRegion(Regions.fromName(region)); + cryptoConfig.setAwsKmsRegion(awsRegion); + } + + AmazonS3EncryptionClient client = new AmazonS3EncryptionClient(credentialsProvider, materialProvider, cryptoConfig); + if (haveRegion) { + client.setRegion(awsRegion); + } + + return client; + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/NoOpEncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/NoOpEncryptionStrategy.java new file mode 100644 index 0000000..5307504 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/NoOpEncryptionStrategy.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption; + +public class NoOpEncryptionStrategy implements S3EncryptionStrategy { +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java new file mode 100644 index 0000000..677fc0e --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/S3EncryptionStrategy.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.UploadPartRequest; +import org.apache.nifi.components.ValidationResult; + +/** + * This interface defines the API for S3 encryption strategies. The methods have empty defaults + * to minimize the burden on implementations. + * + */ +public interface S3EncryptionStrategy { + + /** + * Configure a {@link PutObjectRequest} for encryption. + * @param request the request to configure. + * @param objectMetadata the request metadata to configure. + * @param keyValue the key id or key material. + */ + default void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata, String keyValue) { + } + + /** + * Configure an {@link InitiateMultipartUploadRequest} for encryption. + * @param request the request to configure. + * @param objectMetadata the request metadata to configure. + * @param keyValue the key id or key material. + */ + default void configureInitiateMultipartUploadRequest(InitiateMultipartUploadRequest request, ObjectMetadata objectMetadata, String keyValue) { + } + + /** + * Configure a {@link GetObjectRequest} for encryption. + * @param request the request to configure. + * @param objectMetadata the request metadata to configure. + * @param keyValue the key id or key material. + */ + default void configureGetObjectRequest(GetObjectRequest request, ObjectMetadata objectMetadata, String keyValue) { + } + + /** + * Configure an {@link UploadPartRequest} for encryption. + * @param request the request to configure. + * @param objectMetadata the request metadata to configure. + * @param keyValue the key id or key material. + */ + default void configureUploadPartRequest(UploadPartRequest request, ObjectMetadata objectMetadata, String keyValue) { + } + + /** + * Create an S3 encryption client. + * + * @param credentialsProvider AWS credentials provider. + * @param clientConfiguration Client configuration. + * @return {@link AmazonS3Client}, perhaps an {@link com.amazonaws.services.s3.AmazonS3EncryptionClient} + */ + default AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, String region, String keyIdOrMaterial) throws SecurityException { + return null; + } + + /** + * Validate a key id or key material. + * + * @param keyValue key id or key material to validate. + * @return ValidationResult instance. + */ + default ValidationResult validateKey(String keyValue) { + return new ValidationResult.Builder().valid(true).build(); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEKEncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEKEncryptionStrategy.java new file mode 100644 index 0000000..231a5c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideCEKEncryptionStrategy.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption; + +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.SSECustomerKey; +import com.amazonaws.services.s3.model.UploadPartRequest; +import org.apache.nifi.components.ValidationResult; +import org.bouncycastle.util.encoders.Base64; + +/** + * This strategy uses a customer key to perform server-side encryption. Use this strategy when you want the server to perform the encryption, + * (meaning you pay cost of processing) and when you want to manage the key material yourself. + * + * See https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html + * + */ +public class ServerSideCEKEncryptionStrategy implements S3EncryptionStrategy { + @Override + public void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata, String keyValue) { + SSECustomerKey customerKey = new SSECustomerKey(keyValue); + request.setSSECustomerKey(customerKey); + } + + @Override + public void configureInitiateMultipartUploadRequest(InitiateMultipartUploadRequest request, ObjectMetadata objectMetadata, String keyValue) { + SSECustomerKey customerKey = new SSECustomerKey(keyValue); + request.setSSECustomerKey(customerKey); + } + + @Override + public void configureGetObjectRequest(GetObjectRequest request, ObjectMetadata objectMetadata, String keyValue) { + SSECustomerKey customerKey = new SSECustomerKey(keyValue); + request.setSSECustomerKey(customerKey); + } + + @Override + public void configureUploadPartRequest(UploadPartRequest request, ObjectMetadata objectMetadata, String keyValue) { + SSECustomerKey customerKey = new SSECustomerKey(keyValue); + request.setSSECustomerKey(customerKey); + } + + @Override + public ValidationResult validateKey(String keyValue) { + boolean decoded = false; + boolean sized = false; + byte[] keyMaterial; + + try { + keyMaterial = Base64.decode(keyValue); + decoded = true; + sized = (keyMaterial.length > 0) && (keyMaterial.length % 32) == 0; + } catch (final Exception ignored) { + } + + return new ValidationResult.Builder().valid(decoded && sized).build(); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideKMSEncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideKMSEncryptionStrategy.java new file mode 100644 index 0000000..a31222e --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideKMSEncryptionStrategy.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption; + +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; + +/** + * This strategy uses a KMS key to perform server-side encryption. Use this strategy when you want the server to perform the encryption, + * (meaning you pay the cost of processing) and when you want to use a KMS key. + * + * See https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html + * + */ +public class ServerSideKMSEncryptionStrategy implements S3EncryptionStrategy { + @Override + public void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata, String keyValue) { + SSEAwsKeyManagementParams keyParams = new SSEAwsKeyManagementParams(keyValue); + request.setSSEAwsKeyManagementParams(keyParams); + } + + @Override + public void configureInitiateMultipartUploadRequest(InitiateMultipartUploadRequest request, ObjectMetadata objectMetadata, String keyValue) { + SSEAwsKeyManagementParams keyParams = new SSEAwsKeyManagementParams(keyValue); + request.setSSEAwsKeyManagementParams(keyParams); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java new file mode 100644 index 0000000..0845e12 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/ServerSideS3EncryptionStrategy.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption; + +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; + + +/** + * This strategy uses S3-managed keys to perform server-side encryption. Use this strategy when you want the server to + * perform the encryption (meaning you pay the cost of processing) and you want AWS to completely manage the key. + * + * + * See https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html + * + */ +public class ServerSideS3EncryptionStrategy implements S3EncryptionStrategy { + @Override + public void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata, String keyValue) { + objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java new file mode 100644 index 0000000..21d8d9e --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/encryption/StandardS3EncryptionService.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.UploadPartRequest; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +import org.apache.nifi.processors.aws.s3.AmazonS3EncryptionService; +import org.apache.nifi.processors.aws.s3.AbstractS3Processor; + +import org.apache.nifi.reporting.InitializationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +@Tags({"service", "encryption", "encrypt", "decryption", "decrypt", "key"}) +@CapabilityDescription("Adds configurable encryption to S3 Put and S3 Fetch operations.") +public class StandardS3EncryptionService extends AbstractControllerService implements AmazonS3EncryptionService { + private static final Logger logger = LoggerFactory.getLogger(StandardS3EncryptionService.class); + + public static final String STRATEGY_NAME_NONE = "NONE"; + public static final String STRATEGY_NAME_SSE_S3 = "SSE_S3"; + public static final String STRATEGY_NAME_SSE_KMS = "SSE_KMS"; + public static final String STRATEGY_NAME_SSE_C = "SSE_C"; + public static final String STRATEGY_NAME_CSE_KMS = "CSE_KMS"; + public static final String STRATEGY_NAME_CSE_CMK = "CSE_CMK"; + + private static final Map<String, S3EncryptionStrategy> namedStrategies = new HashMap<String, S3EncryptionStrategy>() {{ + put(STRATEGY_NAME_NONE, new NoOpEncryptionStrategy()); + put(STRATEGY_NAME_SSE_S3, new ServerSideS3EncryptionStrategy()); + put(STRATEGY_NAME_SSE_KMS, new ServerSideKMSEncryptionStrategy()); + put(STRATEGY_NAME_SSE_C, new ServerSideCEKEncryptionStrategy()); + put(STRATEGY_NAME_CSE_KMS, new ClientSideKMSEncryptionStrategy()); + put(STRATEGY_NAME_CSE_CMK, new ClientSideCMKEncryptionStrategy()); + }}; + + private static final AllowableValue NONE = new AllowableValue(STRATEGY_NAME_NONE, "None","No encryption."); + private static final AllowableValue SSE_S3 = new AllowableValue(STRATEGY_NAME_SSE_S3, "Server-side S3","Use server-side, S3-managed encryption."); + private static final AllowableValue SSE_KMS = new AllowableValue(STRATEGY_NAME_SSE_KMS, "Server-side KMS","Use server-side, KMS key to perform encryption."); + private static final AllowableValue SSE_C = new AllowableValue(STRATEGY_NAME_SSE_C, "Server-side Customer Key","Use server-side, customer-supplied key for encryption."); + private static final AllowableValue CSE_KMS = new AllowableValue(STRATEGY_NAME_CSE_KMS, "Client-side KMS","Use client-side, KMS key to perform encryption."); + private static final AllowableValue CSE_CMK = new AllowableValue(STRATEGY_NAME_CSE_CMK, "Client-side Customer Master Key","Use client-side, customer-supplied master key to perform encryption."); + + public static final PropertyDescriptor ENCRYPTION_STRATEGY = new PropertyDescriptor.Builder() + .name("encryption-strategy") + .displayName("Encryption Strategy") + .description("Strategy to use for S3 data encryption and decryption.") + .allowableValues(NONE, SSE_S3, SSE_KMS, SSE_C, CSE_KMS, CSE_CMK) + .required(true) + .defaultValue(NONE.getValue()) + .build(); + + public static final PropertyDescriptor ENCRYPTION_VALUE = new PropertyDescriptor.Builder() + .name("key-id-or-key-material") + .displayName("Key ID or Key Material") + .description("For Server-side CEK and Client-side CMK, this is base64-encoded Key Material. For all others (except 'None'), it is the KMS Key ID.") + .required(false) + .sensitive(true) + .addValidator(new StandardValidators.StringLengthValidator(0, 4096)) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() + .name("region") + .required(false) + .allowableValues(AbstractS3Processor.getAvailableRegions()) + .defaultValue(AbstractS3Processor.createAllowableValue(Regions.DEFAULT_REGION).getValue()) + .build(); + + private String keyValue = ""; + private String region = ""; + private S3EncryptionStrategy encryptionStrategy = new NoOpEncryptionStrategy(); + private String strategyName = STRATEGY_NAME_NONE; + + @OnEnabled + public void onConfigured(final ConfigurationContext context) throws InitializationException { + final String newStrategyName = context.getProperty(ENCRYPTION_STRATEGY).getValue(); + final String newKeyValue = context.getProperty(ENCRYPTION_VALUE).getValue(); + final S3EncryptionStrategy newEncryptionStrategy = namedStrategies.get(newStrategyName); + String newRegion = null; + + if (context.getProperty(REGION) != null ) { + newRegion = context.getProperty(REGION).getValue(); + } + + if (newEncryptionStrategy == null) { + final String msg = "No encryption strategy found for name: " + strategyName; + logger.warn(msg); + throw new InitializationException(msg); + } + + strategyName = newStrategyName; + encryptionStrategy = newEncryptionStrategy; + keyValue = newKeyValue; + region = newRegion; + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + Collection<ValidationResult> validationResults = new ArrayList<>(); + validationResults.add(encryptionStrategy.validateKey(validationContext.getProperty(ENCRYPTION_VALUE).getValue())); + return validationResults; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(ENCRYPTION_STRATEGY); + properties.add(ENCRYPTION_VALUE); + properties.add(REGION); + return Collections.unmodifiableList(properties); + } + + @Override + public void configurePutObjectRequest(PutObjectRequest request, ObjectMetadata objectMetadata) { + encryptionStrategy.configurePutObjectRequest(request, objectMetadata, keyValue); + } + + @Override + public void configureInitiateMultipartUploadRequest(InitiateMultipartUploadRequest request, ObjectMetadata objectMetadata) { + encryptionStrategy.configureInitiateMultipartUploadRequest(request, objectMetadata, keyValue); + } + + @Override + public void configureGetObjectRequest(GetObjectRequest request, ObjectMetadata objectMetadata) { + encryptionStrategy.configureGetObjectRequest(request, objectMetadata, keyValue); + } + + @Override + public void configureUploadPartRequest(UploadPartRequest request, ObjectMetadata objectMetadata) { + encryptionStrategy.configureUploadPartRequest(request, objectMetadata, keyValue); + } + + @Override + public AmazonS3Client createEncryptionClient(AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration) { + return encryptionStrategy.createEncryptionClient(credentialsProvider, clientConfiguration, region, keyValue); + } + + @Override + public String getRegion() { + return region; + } + + @Override + public String getStrategyName() { + return strategyName; + } +} + + diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 5e2dea4..390d4c9 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -12,4 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService \ No newline at end of file +org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService + +org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.S3EncryptionService/additionalDetails.html b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.S3EncryptionService/additionalDetails.html new file mode 100644 index 0000000..108a0b7 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/docs/org.apache.nifi.processors.aws.s3.encryption.S3EncryptionService/additionalDetails.html @@ -0,0 +1,71 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>S3EncryptionService</title> + + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> +</head> +<body> + +<div> + The <code>S3EncryptionService</code> manages an encryption strategy and applies that strategy to various S3 operations. + + <br> + + <b>Note:</b> this service has no effect when a processor has the <code>SERVER_SIDE_ENCRYPTION</code> property set. To use + this service with processors so configured, first create a service instance, set the <code>Encryption Strategy</code> to <code>Server-side S3</code>, + disable the <code>SERVER_SIDE_ENCRYPTION</code> processor setting, and finally, associate the processor with the service. +</div> + + +<h2>Configuration Details</h2> +<h3>Encryption Strategy</h3> + +<div> + The name of the specific encryption strategy for this service to use when encrypting and decrypting S3 operations. + + <ul> + <li><code>None</code> - no encryption is configured or applied.</li> + <li><code>Server-side S3</code> - encryption and decryption is managed by S3; no keys are required.</li> + <li><code>Server-side KMS</code> - encryption and decryption are performed by S3 using the configured KMS key.</li> + <li><code>Server-side Customer Key</code> - encryption and decryption are performed by S3 using the supplied customer key.</li> + <li><code>Client-side KMS</code> - like the server-side KMS strategy, with the encryption and decryption performed by the client.</li> + <li><code>Client-side Customer Master Key</code> - like the server-side CEK strategy, with the encryption and decryption performed by the client.</li> + </ul> +</div> + +<h3>Key ID or Key Material</h3> +<p> + When configured for either the server-side or client-side KMS strategy, this field should contain the ID or alias + of that key. +</p> +<p> + When configured for either the server-side or client-side customer key strategies, this field should contain the key + material, and that material must be base64 encoded. +</p> +<p> + All other encryption strategies ignore this field. +</p> + +<h3>Region</h3> +<div> + KMS key region, if any. This value must match the actual region of the KMS key if supplied. +</div> + +</body> +</html> diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java index 00f8567..0860085 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java @@ -17,6 +17,15 @@ package org.apache.nifi.processors.aws.s3; import com.amazonaws.auth.PropertiesCredentials; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.kms.AWSKMS; +import com.amazonaws.services.kms.AWSKMSClient; +import com.amazonaws.services.kms.model.CreateKeyRequest; +import com.amazonaws.services.kms.model.CreateKeyResult; +import com.amazonaws.services.kms.model.GenerateDataKeyRequest; +import com.amazonaws.services.kms.model.GenerateDataKeyResult; +import com.amazonaws.services.kms.model.ScheduleKeyDeletionRequest; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CreateBucketRequest; @@ -67,6 +76,7 @@ public abstract class AbstractS3IT { // Static so multiple Tests can use same client protected static AmazonS3Client client; + protected static AWSKMS kmsClient; @BeforeClass public static void oneTimeSetup() { @@ -82,6 +92,8 @@ public abstract class AbstractS3IT { try { final PropertiesCredentials credentials = new PropertiesCredentials(fis); client = new AmazonS3Client(credentials); + kmsClient = new AWSKMSClient(credentials); + kmsClient.setRegion(Region.getRegion(Regions.fromName(REGION))); if (client.doesBucketExist(BUCKET_NAME)) { fail("Bucket " + BUCKET_NAME + " exists. Choose a different bucket name to continue test"); @@ -185,4 +197,19 @@ public abstract class AbstractS3IT { return new File(uri); } + + protected static String getKMSKey() { + CreateKeyRequest cmkRequest = new CreateKeyRequest().withDescription("CMK for unit tests"); + CreateKeyResult cmkResult = kmsClient.createKey(cmkRequest); + + GenerateDataKeyRequest dekRequest = new GenerateDataKeyRequest().withKeyId(cmkResult.getKeyMetadata().getKeyId()).withKeySpec("AES_128"); + GenerateDataKeyResult dekResult = kmsClient.generateDataKey(dekRequest); + + return dekResult.getKeyId(); + } + + protected static void deleteKMSKey(String keyId) { + ScheduleKeyDeletionRequest req = new ScheduleKeyDeletionRequest().withKeyId(keyId).withPendingWindowInDays(7); + kmsClient.scheduleKeyDeletion(req); + } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java index 1bbfa28..7a2cf3c 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java @@ -21,6 +21,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -33,20 +34,28 @@ import com.amazonaws.services.s3.model.GetObjectTaggingResult; import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.Tag; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Processor; import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; +import org.apache.nifi.processors.aws.s3.encryption.StandardS3EncryptionService; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockPropertyValue; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @@ -58,11 +67,17 @@ import com.amazonaws.services.s3.model.MultipartUploadListing; import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.Region; import com.amazonaws.services.s3.model.StorageClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Provides integration level testing with actual AWS S3 resources for {@link PutS3Object} and requires additional configuration and resources to work. */ public class ITPutS3Object extends AbstractS3IT { + private static final Logger logger = LoggerFactory.getLogger(ITPutS3Object.class); final static String TEST_ENDPOINT = "https://endpoint.com"; // final static String TEST_TRANSIT_URI = "https://" + BUCKET_NAME + ".endpoint.com"; @@ -74,6 +89,28 @@ public class ITPutS3Object extends AbstractS3IT { final static Pattern reS3ETag = Pattern.compile("[0-9a-fA-f]{32,32}(-[0-9]+)?"); + + private static String kmsKeyId = ""; + private static String randomKeyMaterial = ""; + + + @BeforeClass + public static void setupClass() { + byte[] keyRawBytes = new byte[32]; + SecureRandom secureRandom = new SecureRandom(); + secureRandom.nextBytes(keyRawBytes); + + randomKeyMaterial = Base64.encodeBase64String(keyRawBytes); + kmsKeyId = getKMSKey(); + } + + @AfterClass + public static void teardownClass() { + if (StringUtils.isNotEmpty(kmsKeyId)) { + deleteKMSKey(kmsKeyId); + } + } + @Test public void testSimplePut() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); @@ -185,7 +222,6 @@ public class ITPutS3Object extends AbstractS3IT { } - @Test public void testPutThenFetchWithoutSSE() throws IOException { testPutThenFetch(PutS3Object.NO_SERVER_SIDE_ENCRYPTION); @@ -827,7 +863,6 @@ public class ITPutS3Object extends AbstractS3IT { Assert.assertTrue(ff1.getSize() > S3_MAXIMUM_OBJECT_SIZE); } - @Ignore @Test public void testS3MultipartAgeoff() throws InterruptedException, IOException { final PutS3Object processor = new PutS3Object(); @@ -910,6 +945,165 @@ public class ITPutS3Object extends AbstractS3IT { Assert.assertEquals("true", objectTags.get(0).getValue()); } + @Test + public void testEncryptionServiceWithServerSideS3ManagedEncryptionStrategy() throws IOException, InitializationException { + TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_S3, ""); + + Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "test.txt"); + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS); + + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + Assert.assertEquals(1, flowFiles.size()); + Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size()); + MockFlowFile putSuccess = flowFiles.get(0); + Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_SSE_S3); + + MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_SSE_S3, ""); + flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_SSE_S3); + } + + @Test + public void testEncryptionServiceWithServerSideKMSEncryptionStrategy() throws IOException, InitializationException { + TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "test.txt"); + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS); + + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + Assert.assertEquals(1, flowFiles.size()); + Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size()); + MockFlowFile putSuccess = flowFiles.get(0); + Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS); + + MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS, kmsKeyId); + flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, "aws:kms"); + flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_SSE_KMS); + } + + @Test + public void testEncryptionServiceWithServerSideCPEKEncryptionStrategy() throws IOException, InitializationException { + TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "test.txt"); + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS); + + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + Assert.assertEquals(1, flowFiles.size()); + Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size()); + MockFlowFile putSuccess = flowFiles.get(0); + Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_SSE_C); + + MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_SSE_C, randomKeyMaterial); + flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + // successful fetch does not indicate type of original encryption: + flowFile.assertAttributeEquals(PutS3Object.S3_SSE_ALGORITHM, null); + // but it does indicate it via our specific attribute: + flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_SSE_C); + } + + @Test + public void testEncryptionServiceWithClientSideKMSEncryptionStrategy() throws InitializationException, IOException { + TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "test.txt"); + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS); + + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + Assert.assertEquals(1, flowFiles.size()); + Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size()); + MockFlowFile putSuccess = flowFiles.get(0); + Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS); + + MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS, kmsKeyId); + flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + flowFile.assertAttributeEquals("x-amz-wrap-alg", "kms"); + flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_CSE_KMS); + } + + @Test + public void testEncryptionServiceWithClientSideCMKEncryptionStrategy() throws InitializationException, IOException { + TestRunner runner = createPutEncryptionTestRunner(StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK, randomKeyMaterial); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "test.txt"); + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); + runner.assertValid(); + runner.run(); + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS); + + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + Assert.assertEquals(1, flowFiles.size()); + Assert.assertEquals(0, runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE).size()); + MockFlowFile putSuccess = flowFiles.get(0); + Assert.assertEquals(putSuccess.getAttribute(PutS3Object.S3_ENCRYPTION_STRATEGY), StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK); + + MockFlowFile flowFile = fetchEncryptedFlowFile(attrs, StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK, randomKeyMaterial); + flowFile.assertAttributeEquals(PutS3Object.S3_ENCRYPTION_STRATEGY, StandardS3EncryptionService.STRATEGY_NAME_CSE_CMK); + flowFile.assertContentEquals(getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + + flowFile.assertAttributeExists("x-amz-key"); + flowFile.assertAttributeNotEquals("x-amz-key", ""); + + flowFile.assertAttributeExists("x-amz-iv"); + flowFile.assertAttributeNotEquals("x-amz-iv", ""); + } + + private static TestRunner createPutEncryptionTestRunner(String strategyName, String keyIdOrMaterial) throws InitializationException { + return createEncryptionTestRunner(new PutS3Object(), strategyName, keyIdOrMaterial); + } + + private static MockFlowFile fetchEncryptedFlowFile(Map<String, String> attributes, String strategyName, String keyIdOrMaterial) throws InitializationException { + final TestRunner runner = createEncryptionTestRunner(new FetchS3Object(), strategyName, keyIdOrMaterial); + runner.enqueue(new byte[0], attributes); + runner.run(1); + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS); + return flowFiles.get(0); + } + + private static TestRunner createEncryptionTestRunner(Processor processor, String strategyName, String keyIdOrMaterial) throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(processor); + final StandardS3EncryptionService service = new StandardS3EncryptionService(); + final ConfigurationContext context = mock(ConfigurationContext.class); + + runner.addControllerService(PutS3Object.ENCRYPTION_SERVICE.getName(), service); + runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + runner.setProperty(PutS3Object.ENCRYPTION_SERVICE, service.getIdentifier()); + runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_STRATEGY, strategyName); + runner.setProperty(service, StandardS3EncryptionService.ENCRYPTION_VALUE, keyIdOrMaterial); + runner.setProperty(service, StandardS3EncryptionService.REGION, REGION); + + when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_STRATEGY)).thenReturn(new MockPropertyValue(strategyName)); + when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_VALUE)).thenReturn(new MockPropertyValue(keyIdOrMaterial)); + when(context.getProperty(StandardS3EncryptionService.REGION)).thenReturn(new MockPropertyValue(REGION)); + + service.onConfigured(context); + runner.enableControllerService(service); + + return runner; + } + private class MockAmazonS3Client extends AmazonS3Client { MultipartUploadListing listing; public void setListing(MultipartUploadListing newlisting) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java index 4326ed7..88ccfb0 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java @@ -24,7 +24,6 @@ import java.util.Map; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.proxy.ProxyConfigurationService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -252,7 +251,8 @@ public class TestFetchS3Object { assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE)); assertTrue(pd.contains(FetchS3Object.TIMEOUT)); assertTrue(pd.contains(FetchS3Object.VERSION_ID)); - assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)); + assertTrue(pd.contains(FetchS3Object.ENCRYPTION_SERVICE)); + assertTrue(pd.contains(FetchS3Object.PROXY_CONFIGURATION_SERVICE)); assertTrue(pd.contains(FetchS3Object.PROXY_HOST)); assertTrue(pd.contains(FetchS3Object.PROXY_HOST_PORT)); assertTrue(pd.contains(FetchS3Object.PROXY_USERNAME)); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java index 6bb750f..6ba6de8 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java @@ -29,7 +29,6 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.proxy.ProxyConfigurationService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -210,7 +209,7 @@ public class TestPutS3Object { public void testGetPropertyDescriptors() { PutS3Object processor = new PutS3Object(); List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); - assertEquals("size should be eq", 33, pd.size()); + assertEquals("size should be eq", 34, pd.size()); assertTrue(pd.contains(PutS3Object.ACCESS_KEY)); assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); assertTrue(pd.contains(PutS3Object.BUCKET)); @@ -232,7 +231,8 @@ public class TestPutS3Object { assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST)); assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST)); assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION)); - assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)); + assertTrue(pd.contains(PutS3Object.ENCRYPTION_SERVICE)); + assertTrue(pd.contains(PutS3Object.PROXY_CONFIGURATION_SERVICE)); assertTrue(pd.contains(PutS3Object.PROXY_HOST)); assertTrue(pd.contains(PutS3Object.PROXY_HOST_PORT)); assertTrue(pd.contains(PutS3Object.PROXY_USERNAME)); @@ -245,5 +245,4 @@ public class TestPutS3Object { assertTrue(pd.contains(PutS3Object.MULTIPART_S3_AGEOFF_INTERVAL)); assertTrue(pd.contains(PutS3Object.MULTIPART_S3_MAX_AGE)); } - } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java new file mode 100644 index 0000000..3261288 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestS3EncryptionStrategies.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption; + +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.UploadPartRequest; +import org.apache.commons.codec.binary.Base64; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.security.SecureRandom; + + +public class TestS3EncryptionStrategies { + + private String randomKeyMaterial = ""; + private String randomKeyId = "mock-key-id"; + private String region = "us-west-1"; + + private ObjectMetadata metadata = null; + private PutObjectRequest putObjectRequest = null; + private InitiateMultipartUploadRequest initUploadRequest = null; + private GetObjectRequest getObjectRequest = null; + private UploadPartRequest uploadPartRequest = null; + + @Before + public void setup() { + byte[] keyRawBytes = new byte[32]; + SecureRandom secureRandom = new SecureRandom(); + secureRandom.nextBytes(keyRawBytes); + randomKeyMaterial = Base64.encodeBase64String(keyRawBytes); + + metadata = new ObjectMetadata(); + putObjectRequest = new PutObjectRequest("", "", ""); + initUploadRequest = new InitiateMultipartUploadRequest("", ""); + getObjectRequest = new GetObjectRequest("", ""); + uploadPartRequest = new UploadPartRequest(); + } + + @Test + public void testClientSideKMSEncryptionStrategy() { + S3EncryptionStrategy strategy = new ClientSideKMSEncryptionStrategy(); + + // This shows that the strategy builds a client: + Assert.assertNotNull(strategy.createEncryptionClient(null, null, region, randomKeyMaterial)); + + // This shows that the strategy does not modify the metadata or any of the requests: + Assert.assertNull(metadata.getSSEAlgorithm()); + Assert.assertNull(putObjectRequest.getSSEAwsKeyManagementParams()); + Assert.assertNull(putObjectRequest.getSSECustomerKey()); + + Assert.assertNull(initUploadRequest.getSSEAwsKeyManagementParams()); + Assert.assertNull(initUploadRequest.getSSECustomerKey()); + + Assert.assertNull(getObjectRequest.getSSECustomerKey()); + + Assert.assertNull(uploadPartRequest.getSSECustomerKey()); + } + + @Test + public void testClientSideCMKEncryptionStrategy() { + S3EncryptionStrategy strategy = new ClientSideCMKEncryptionStrategy(); + + // This shows that the strategy builds a client: + Assert.assertNotNull(strategy.createEncryptionClient(null, null, region, randomKeyMaterial)); + + // This shows that the strategy does not modify the metadata or any of the requests: + Assert.assertNull(metadata.getSSEAlgorithm()); + Assert.assertNull(putObjectRequest.getSSEAwsKeyManagementParams()); + Assert.assertNull(putObjectRequest.getSSECustomerKey()); + + Assert.assertNull(initUploadRequest.getSSEAwsKeyManagementParams()); + Assert.assertNull(initUploadRequest.getSSECustomerKey()); + + Assert.assertNull(getObjectRequest.getSSECustomerKey()); + + Assert.assertNull(uploadPartRequest.getSSECustomerKey()); + } + + @Test + public void testServerSideCEKEncryptionStrategy() { + S3EncryptionStrategy strategy = new ServerSideCEKEncryptionStrategy(); + + // This shows that the strategy does *not* build a client: + Assert.assertNull(strategy.createEncryptionClient(null, null, "", "")); + + // This shows that the strategy sets the SSE customer key as expected: + strategy.configurePutObjectRequest(putObjectRequest, metadata, randomKeyMaterial); + Assert.assertEquals(randomKeyMaterial, putObjectRequest.getSSECustomerKey().getKey()); + Assert.assertNull(putObjectRequest.getSSEAwsKeyManagementParams()); + Assert.assertNull(metadata.getSSEAlgorithm()); + + // Same for InitiateMultipartUploadRequest: + strategy.configureInitiateMultipartUploadRequest(initUploadRequest, metadata, randomKeyMaterial); + Assert.assertEquals(randomKeyMaterial, initUploadRequest.getSSECustomerKey().getKey()); + Assert.assertNull(initUploadRequest.getSSEAwsKeyManagementParams()); + Assert.assertNull(metadata.getSSEAlgorithm()); + + // Same for GetObjectRequest: + strategy.configureGetObjectRequest(getObjectRequest, metadata, randomKeyMaterial); + Assert.assertEquals(randomKeyMaterial, initUploadRequest.getSSECustomerKey().getKey()); + Assert.assertNull(metadata.getSSEAlgorithm()); + + // Same for UploadPartRequest: + strategy.configureUploadPartRequest(uploadPartRequest, metadata, randomKeyMaterial); + Assert.assertEquals(randomKeyMaterial, uploadPartRequest.getSSECustomerKey().getKey()); + Assert.assertNull(metadata.getSSEAlgorithm()); + } + + @Test + public void testServerSideKMSEncryptionStrategy() { + S3EncryptionStrategy strategy = new ServerSideKMSEncryptionStrategy(); + + // This shows that the strategy does *not* build a client: + Assert.assertNull(strategy.createEncryptionClient(null, null, "", "")); + + // This shows that the strategy sets the SSE KMS key id as expected: + strategy.configurePutObjectRequest(putObjectRequest, metadata, randomKeyId); + Assert.assertEquals(randomKeyId, putObjectRequest.getSSEAwsKeyManagementParams().getAwsKmsKeyId()); + Assert.assertNull(putObjectRequest.getSSECustomerKey()); + Assert.assertNull(metadata.getSSEAlgorithm()); + + // Same for InitiateMultipartUploadRequest: + strategy.configureInitiateMultipartUploadRequest(initUploadRequest, metadata, randomKeyId); + Assert.assertEquals(randomKeyId, initUploadRequest.getSSEAwsKeyManagementParams().getAwsKmsKeyId()); + Assert.assertNull(initUploadRequest.getSSECustomerKey()); + Assert.assertNull(metadata.getSSEAlgorithm()); + } + + @Test + public void testServerSideS3EncryptionStrategy() { + S3EncryptionStrategy strategy = new ServerSideS3EncryptionStrategy(); + + // This shows that the strategy does *not* build a client: + Assert.assertNull(strategy.createEncryptionClient(null, null, "", "")); + + // This shows that the strategy sets the SSE algorithm field as expected: + strategy.configurePutObjectRequest(putObjectRequest, metadata, ""); + Assert.assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, metadata.getSSEAlgorithm()); + } + + @Test + public void testNoOpEncryptionStrategy() { + S3EncryptionStrategy strategy = new NoOpEncryptionStrategy(); + + // This shows that the strategy does *not* build a client: + Assert.assertNull(strategy.createEncryptionClient(null, null, "", "")); + + // This shows the request and metadata start with various null objects: + Assert.assertNull(metadata.getSSEAlgorithm()); + Assert.assertNull(putObjectRequest.getSSEAwsKeyManagementParams()); + Assert.assertNull(putObjectRequest.getSSECustomerKey()); + + // Act: + strategy.configurePutObjectRequest(putObjectRequest, metadata, ""); + + // This shows that the request and metadata were not changed: + Assert.assertNull(metadata.getSSEAlgorithm()); + Assert.assertNull(putObjectRequest.getSSEAwsKeyManagementParams()); + Assert.assertNull(putObjectRequest.getSSECustomerKey()); + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java new file mode 100644 index 0000000..d040d34 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/encryption/TestStandardS3EncryptionService.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3.encryption; + +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.UploadPartRequest; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import java.util.List; + + +public class TestStandardS3EncryptionService { + private StandardS3EncryptionService service; + private ConfigurationContext context; + private String strategyName; + private String keyIdOrMaterial; + private String region; + + @Before + public void setup() throws InitializationException { + service = new StandardS3EncryptionService(); + context = Mockito.mock(ConfigurationContext.class); + + strategyName = StandardS3EncryptionService.STRATEGY_NAME_NONE; + keyIdOrMaterial = "test-key-id"; + region = "us-west-1"; + + Mockito.when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_STRATEGY)).thenReturn(new MockPropertyValue(strategyName)); + Mockito.when(context.getProperty(StandardS3EncryptionService.ENCRYPTION_VALUE)).thenReturn(new MockPropertyValue(keyIdOrMaterial)); + Mockito.when(context.getProperty(StandardS3EncryptionService.REGION)).thenReturn(new MockPropertyValue(region)); + service.onConfigured(context); + } + + @Test + public void testServiceProperties() { + Assert.assertEquals(service.getRegion(), region); + Assert.assertEquals(service.getStrategyName(), strategyName); + } + + @Test + public void testCreateClientReturnsNull() { + Assert.assertNull(service.createEncryptionClient(null, null)); + } + + @Test + public void testRequests() { + final ObjectMetadata metadata = new ObjectMetadata(); + final GetObjectRequest getObjectRequest = new GetObjectRequest("", ""); + final InitiateMultipartUploadRequest initUploadRequest = new InitiateMultipartUploadRequest("", ""); + final PutObjectRequest putObjectRequest = new PutObjectRequest("", "", ""); + final UploadPartRequest uploadPartRequest = new UploadPartRequest(); + + service.configureGetObjectRequest(getObjectRequest, metadata); + Assert.assertNull(getObjectRequest.getSSECustomerKey()); + Assert.assertNull(metadata.getSSEAlgorithm()); + + service.configureUploadPartRequest(uploadPartRequest, metadata); + Assert.assertNull(uploadPartRequest.getSSECustomerKey()); + Assert.assertNull(metadata.getSSEAlgorithm()); + + service.configurePutObjectRequest(putObjectRequest, metadata); + Assert.assertNull(putObjectRequest.getSSECustomerKey()); + Assert.assertNull(metadata.getSSEAlgorithm()); + + service.configureInitiateMultipartUploadRequest(initUploadRequest, metadata); + Assert.assertNull(initUploadRequest.getSSECustomerKey()); + Assert.assertNull(metadata.getSSEAlgorithm()); + } + + @Test + public void testProperties() { + List<PropertyDescriptor> properties = service.getSupportedPropertyDescriptors(); + Assert.assertEquals(3, properties.size()); + + Assert.assertEquals(properties.get(0).getName(), StandardS3EncryptionService.ENCRYPTION_STRATEGY.getName()); + Assert.assertEquals(properties.get(1).getName(), StandardS3EncryptionService.ENCRYPTION_VALUE.getName()); + Assert.assertEquals(properties.get(2).getName(), StandardS3EncryptionService.REGION.getName()); + } +}