http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml new file mode 100644 index 0000000..77fbe97 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-aws-bundle</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-aws-nar</artifactId> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-aws-processors</artifactId> + <version>0.3.0-SNAPSHOT</version> + </dependency> + </dependencies> + +</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..64d4975 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,74 @@ +nifi-aws-nar +Copyright 2015 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache HttpComponents + The following NOTICE information applies: + Apache HttpClient + Copyright 1999-2014 The Apache Software Foundation + + Apache HttpCore + Copyright 2005-2014 The Apache Software Foundation + + This project contains annotations derived from JCIP-ANNOTATIONS + Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net + + (ASLv2) Joda Time + The following NOTICE information applies: + This product includes software developed by + Joda.org (http://www.joda.org/). + + (ASLv2) Apache Commons Codec + The following NOTICE information applies: + Apache Commons Codec + Copyright 2002-2014 The Apache Software Foundation + + src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java + contains test data from http://aspell.net/test/orig/batch0.tab. + Copyright (C) 2002 Kevin Atkinson (kev...@gnu.org) + + =============================================================================== + + The content of package org.apache.commons.codec.language.bm has been translated + from the original php source code available at http://stevemorse.org/phoneticinfo.htm + with permission from the original authors. + Original source copyright: + Copyright (c) 2008 Alexander Beider & Stephen P. Morse. + + (ASLv2) Apache Commons Logging + The following NOTICE information applies: + Apache Commons Logging + Copyright 2003-2013 The Apache Software Foundation + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2014 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Amazon Web Services SDK + The following NOTICE information applies: + Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. + + This product includes software developed by + Amazon Technologies, Inc (http://www.amazon.com/). + + ********************** + THIRD PARTY COMPONENTS + ********************** + This software includes third party software subject to the following copyrights: + - XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty. + - JSON parsing and utility functions from JSON.org - Copyright 2002 JSON.org. + - PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc. + + http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml new file mode 100644 index 0000000..b784524 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml @@ -0,0 +1,67 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-aws-bundle</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-aws-processors</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/hello.txt</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java new file mode 100644 index 0000000..a781ff9 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AnonymousAWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.PropertiesCredentials; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; + +public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("FlowFiles are routed to success after being successfully copied to Amazon S3").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("FlowFiles are routed to failure if unable to be copied to Amazon S3").build(); + + public static final Set<Relationship> relationships = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + public static final PropertyDescriptor CREDENTAILS_FILE = new PropertyDescriptor.Builder() + .name("Credentials File") + .expressionLanguageSupported(false) + .required(false) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + public static final PropertyDescriptor ACCESS_KEY = new PropertyDescriptor.Builder() + .name("Access Key") + .expressionLanguageSupported(false) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder() + .name("Secret Key") + .expressionLanguageSupported(false) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() + .name("Region") + .required(true) + .allowableValues(getAvailableRegions()) + .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue()) + .build(); + + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("30 secs") + .build(); + + private volatile ClientType client; + + private static AllowableValue createAllowableValue(final Regions regions) { + return new AllowableValue(regions.getName(), regions.getName(), regions.getName()); + } + + private static AllowableValue[] getAvailableRegions() { + final List<AllowableValue> values = new ArrayList<>(); + for (final Regions regions : Regions.values()) { + values.add(createAllowableValue(regions)); + } + + return (AllowableValue[]) values.toArray(new AllowableValue[values.size()]); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext)); + + final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet(); + final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet(); + if ((accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet)) { + problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build()); + } + + final boolean credentialsFileSet = validationContext.getProperty(CREDENTAILS_FILE).isSet(); + if ((secretKeySet || accessKeySet) && credentialsFileSet) { + problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build()); + } + + return problems; + } + + protected ClientConfiguration createConfiguration(final ProcessContext context) { + final ClientConfiguration config = new ClientConfiguration(); + config.setMaxConnections(context.getMaxConcurrentTasks()); + config.setMaxErrorRetry(0); + config.setUserAgent("NiFi"); + + final int commsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + config.setConnectionTimeout(commsTimeout); + config.setSocketTimeout(commsTimeout); + + return config; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + final ClientType awsClient = createClient(context, getCredentials(context), createConfiguration(context)); + this.client = awsClient; + + // if the processor supports REGION, get the configured region. + if (getSupportedPropertyDescriptors().contains(REGION)) { + final String region = context.getProperty(REGION).getValue(); + if (region != null) { + client.setRegion(Region.getRegion(Regions.fromName(region))); + } + } + } + + protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config); + + protected ClientType getClient() { + return client; + } + + protected AWSCredentials getCredentials(final ProcessContext context) { + final String accessKey = context.getProperty(ACCESS_KEY).getValue(); + final String secretKey = context.getProperty(SECRET_KEY).getValue(); + + final String credentialsFile = context.getProperty(CREDENTAILS_FILE).getValue(); + + if (credentialsFile != null) { + try { + return new PropertiesCredentials(new File(credentialsFile)); + } catch (final IOException ioe) { + throw new ProcessException("Could not read Credentials File", ioe); + } + } + + if (accessKey != null && secretKey != null) { + return new BasicAWSCredentials(accessKey, secretKey); + } + + return new AnonymousAWSCredentials(); + } + + protected boolean isEmpty(final String value) { + return value == null || value.trim().equals(""); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java new file mode 100644 index 0000000..76880ef --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CanonicalGrantee; +import com.amazonaws.services.s3.model.EmailAddressGrantee; +import com.amazonaws.services.s3.model.Grantee; +import com.amazonaws.services.s3.model.Owner; +import com.amazonaws.services.s3.model.Permission; + +public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3Client> { + + public static final PropertyDescriptor FULL_CONTROL_USER_LIST = new PropertyDescriptor.Builder() + .name("FullControl User List") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Full Control for an object") + .defaultValue("${s3.permissions.full.users}") + .build(); + public static final PropertyDescriptor READ_USER_LIST = new PropertyDescriptor.Builder() + .name("Read Permission User List") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Read Access for an object") + .defaultValue("${s3.permissions.read.users}") + .build(); + public static final PropertyDescriptor WRITE_USER_LIST = new PropertyDescriptor.Builder() + .name("Write Permission User List") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Write Access for an object") + .defaultValue("${s3.permissions.write.users}") + .build(); + public static final PropertyDescriptor READ_ACL_LIST = new PropertyDescriptor.Builder() + .name("Read ACL User List") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to read the Access Control List for an object") + .defaultValue("${s3.permissions.readacl.users}") + .build(); + public static final PropertyDescriptor WRITE_ACL_LIST = new PropertyDescriptor.Builder() + .name("Write ACL User List") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to change the Access Control List for an object") + .defaultValue("${s3.permissions.writeacl.users}") + .build(); + public static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder() + .name("Owner") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("The Amazon ID to use for the object's owner") + .defaultValue("${s3.owner}") + .build(); + public static final PropertyDescriptor BUCKET = new PropertyDescriptor.Builder() + .name("Bucket") + .expressionLanguageSupported(true) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .name("Object Key") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("${filename}") + .build(); + + @Override + protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + return new AmazonS3Client(credentials, config); + } + + protected Grantee createGrantee(final String value) { + if (isEmpty(value)) { + return null; + } + + if (value.contains("@")) { + return new EmailAddressGrantee(value); + } else { + return new CanonicalGrantee(value); + } + } + + protected final List<Grantee> createGrantees(final String value) { + if (isEmpty(value)) { + return Collections.emptyList(); + } + + final List<Grantee> grantees = new ArrayList<>(); + final String[] vals = value.split(","); + for (final String val : vals) { + final String identifier = val.trim(); + final Grantee grantee = createGrantee(identifier); + if (grantee != null) { + grantees.add(grantee); + } + } + return grantees; + } + + protected final AccessControlList createACL(final ProcessContext context, final FlowFile flowFile) { + final AccessControlList acl = new AccessControlList(); + + final String ownerId = context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue(); + if (!isEmpty(ownerId)) { + final Owner owner = new Owner(); + owner.setId(ownerId); + acl.setOwner(owner); + } + + for (final Grantee grantee : createGrantees(context.getProperty(FULL_CONTROL_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) { + acl.grantPermission(grantee, Permission.FullControl); + } + + for (final Grantee grantee : createGrantees(context.getProperty(READ_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) { + acl.grantPermission(grantee, Permission.Read); + } + + for (final Grantee grantee : createGrantees(context.getProperty(WRITE_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) { + acl.grantPermission(grantee, Permission.Write); + } + + for (final Grantee grantee : createGrantees(context.getProperty(READ_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) { + acl.grantPermission(grantee, Permission.ReadAcp); + } + + for (final Grantee grantee : createGrantees(context.getProperty(WRITE_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) { + acl.grantPermission(grantee, Permission.WriteAcp); + } + + return acl; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java new file mode 100644 index 0000000..2406b67 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; + +@SupportsBatching +@SeeAlso({PutS3Object.class}) +@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"}) +@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile") +@WritesAttributes({ + @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"), + @WritesAttribute(attribute = "path", description = "The path of the file"), + @WritesAttribute(attribute = "absolute.path", description = "The path of the file"), + @WritesAttribute(attribute = "filename", description = "The name of the file"), + @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the file"), + @WritesAttribute(attribute = "hash.algorithm", description = "MD5"), + @WritesAttribute(attribute = "mime.type", description = "If S3 provides the content type/MIME type, this attribute will hold that file"), + @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"), + @WritesAttribute(attribute = "s3.expirationTime", description = "If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"), + @WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"), + @WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"),}) +public class FetchS3Object extends AbstractS3Processor { + + public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder() + .name("Version") + .description("The Version of the Object to download") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); + + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( + Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT, VERSION_ID)); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue(); + + final AmazonS3 client = getClient(); + final GetObjectRequest request; + if (versionId == null) { + request = new GetObjectRequest(bucket, key); + } else { + request = new GetObjectRequest(bucket, key, versionId); + } + + final Map<String, String> attributes = new HashMap<>(); + try (final S3Object s3Object = client.getObject(request)) { + flowFile = session.importFrom(s3Object.getObjectContent(), flowFile); + attributes.put("s3.bucket", s3Object.getBucketName()); + + final ObjectMetadata metadata = s3Object.getObjectMetadata(); + if (metadata.getContentDisposition() != null) { + final String fullyQualified = metadata.getContentDisposition(); + final int lastSlash = fullyQualified.lastIndexOf("/"); + if (lastSlash > -1 && lastSlash < fullyQualified.length() - 1) { + attributes.put(CoreAttributes.PATH.key(), fullyQualified.substring(0, lastSlash)); + attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), fullyQualified); + attributes.put(CoreAttributes.FILENAME.key(), fullyQualified.substring(lastSlash + 1)); + } else { + attributes.put(CoreAttributes.FILENAME.key(), metadata.getContentDisposition()); + } + } + if (metadata.getContentMD5() != null) { + attributes.put("hash.value", metadata.getContentMD5()); + attributes.put("hash.algorithm", "MD5"); + } + if (metadata.getContentType() != null) { + attributes.put(CoreAttributes.MIME_TYPE.key(), metadata.getContentType()); + } + if (metadata.getETag() != null) { + attributes.put("s3.etag", metadata.getETag()); + } + if (metadata.getExpirationTime() != null) { + attributes.put("s3.expirationTime", String.valueOf(metadata.getExpirationTime().getTime())); + } + if (metadata.getExpirationTimeRuleId() != null) { + attributes.put("s3.expirationTimeRuleId", metadata.getExpirationTimeRuleId()); + } + if (metadata.getUserMetadata() != null) { + attributes.putAll(metadata.getUserMetadata()); + } + if (metadata.getVersionId() != null) { + attributes.put("s3.version", metadata.getVersionId()); + } + } catch (final IOException | AmazonClientException ioe) { + getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ioe}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + if (!attributes.isEmpty()) { + flowFile = session.putAllAttributes(flowFile, attributes); + } + + session.transfer(flowFile, REL_SUCCESS); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis}); + session.getProvenanceReporter().receive(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java new file mode 100644 index 0000000..24c82dd --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.StorageClass; + +@SupportsBatching +@SeeAlso({FetchS3Object.class}) +@Tags({"Amazon", "S3", "AWS", "Archive", "Put"}) +@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket") +@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", + value = "The value of a User-Defined Metadata field to add to the S3 Object", + description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", + supportsExpressionLanguage = true) +@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object") +@WritesAttributes({ + @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"), + @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"), + @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set") +}) +public class PutS3Object extends AbstractS3Processor { + + public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder() + .name("Expiration Time Rule") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder() + .name("Storage Class") + .required(true) + .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name()) + .defaultValue(StorageClass.Standard.name()) + .build(); + + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( + Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, + FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER)); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + + final AmazonS3 s3 = getClient(); + final FlowFile ff = flowFile; + final Map<String, String> attributes = new HashMap<>(); + try { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + final ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key())); + objectMetadata.setContentLength(ff.getSize()); + + final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue(); + if (expirationRule != null) { + objectMetadata.setExpirationTimeRuleId(expirationRule); + } + + final Map<String, String> userMetadata = new HashMap<>(); + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + if (entry.getKey().isDynamic()) { + final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue(); + userMetadata.put(entry.getKey().getName(), value); + } + } + + if (!userMetadata.isEmpty()) { + objectMetadata.setUserMetadata(userMetadata); + } + + final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata); + request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue())); + final AccessControlList acl = createACL(context, ff); + if (acl != null) { + request.setAccessControlList(acl); + } + + final PutObjectResult result = s3.putObject(request); + if (result.getVersionId() != null) { + attributes.put("s3.version", result.getVersionId()); + } + + attributes.put("s3.etag", result.getETag()); + + final Date expiration = result.getExpirationTime(); + if (expiration != null) { + attributes.put("s3.expiration", expiration.toString()); + } + } + } + }); + + if (!attributes.isEmpty()) { + flowFile = session.putAllAttributes(flowFile, attributes); + } + session.transfer(flowFile, REL_SUCCESS); + + final String url = "http://" + bucket + ".s3.amazonaws.com/" + key; + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, url, millis); + + getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[]{ff, millis}); + } catch (final ProcessException | AmazonClientException pe) { + getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[]{flowFile, pe}); + session.transfer(flowFile, REL_FAILURE); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java new file mode 100644 index 0000000..5b57647 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sns; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.services.sns.AmazonSNSClient; + +public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSNSClient> { + + protected static final AllowableValue ARN_TYPE_TOPIC + = new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic"); + protected static final AllowableValue ARN_TYPE_TARGET + = new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber"); + + public static final PropertyDescriptor ARN = new PropertyDescriptor.Builder() + .name("Amazon Resource Name (ARN)") + .description("The name of the resource to which notifications should be published") + .expressionLanguageSupported(true) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor ARN_TYPE = new PropertyDescriptor.Builder() + .name("ARN Type") + .description("The type of Amazon Resource Name that is being used.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET) + .defaultValue(ARN_TYPE_TOPIC.getValue()) + .build(); + + @Override + protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + return new AmazonSNSClient(credentials, config); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java new file mode 100644 index 0000000..7d42703 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sns; + +import java.io.ByteArrayOutputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.sqs.GetSQS; +import org.apache.nifi.processors.aws.sqs.PutSQS; + +import com.amazonaws.services.sns.AmazonSNSClient; +import com.amazonaws.services.sns.model.MessageAttributeValue; +import com.amazonaws.services.sns.model.PublishRequest; + +@SupportsBatching +@SeeAlso({GetSQS.class, PutSQS.class}) +@Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"}) +@CapabilityDescription("Sends the content of a FlowFile as a notification to the Amazon Simple Notification Service") +public class PutSNS extends AbstractSNSProcessor { + + public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder() + .name("Character Set") + .description("The character set in which the FlowFile's content is encoded") + .defaultValue("UTF-8") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .required(true) + .build(); + public static final PropertyDescriptor USE_JSON_STRUCTURE = new PropertyDescriptor.Builder() + .name("Use JSON Structure") + .description("If true, the contents of the FlowFile must be JSON with a top-level element named 'default'." + + " Additional elements can be used to send different messages to different protocols. See the Amazon" + + " SNS Documentation for more information.") + .defaultValue("false") + .allowableValues("true", "false") + .required(true) + .build(); + public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder() + .name("E-mail Subject") + .description("The optional subject to use for any subscribers that are subscribed via E-mail") + .expressionLanguageSupported(true) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( + Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, TIMEOUT, + USE_JSON_STRUCTURE, CHARACTER_ENCODING)); + + public static final int MAX_SIZE = 256 * 1024; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .dynamic(true) + .build(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + if (flowFile.getSize() > MAX_SIZE) { + getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[]{flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final Charset charset = Charset.forName(context.getProperty(CHARACTER_ENCODING).evaluateAttributeExpressions(flowFile).getValue()); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + session.exportTo(flowFile, baos); + final String message = new String(baos.toByteArray(), charset); + + final AmazonSNSClient client = getClient(); + final PublishRequest request = new PublishRequest(); + request.setMessage(message); + + if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) { + request.setMessageStructure("json"); + } + + final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue(); + final String arnType = context.getProperty(ARN_TYPE).getValue(); + if (arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue())) { + request.setTopicArn(arn); + } else { + request.setTargetArn(arn); + } + + final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue(); + if (subject != null) { + request.setSubject(subject); + } + + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + if (entry.getKey().isDynamic() && !isEmpty(entry.getValue())) { + final MessageAttributeValue value = new MessageAttributeValue(); + value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue()); + value.setDataType("String"); + request.addMessageAttributesEntry(entry.getKey().getName(), value); + } + } + + try { + client.publish(request); + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().send(flowFile, arn); + getLogger().info("Successfully published notification for {}", new Object[]{flowFile}); + } catch (final Exception e) { + getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[]{flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java new file mode 100644 index 0000000..3cee02d --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sqs; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.services.sqs.AmazonSQSClient; + +public abstract class AbstractSQSProcessor extends AbstractAWSProcessor<AmazonSQSClient> { + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of messages to send in a single network request") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("25") + .build(); + + public static final PropertyDescriptor QUEUE_URL = new PropertyDescriptor.Builder() + .name("Queue URL") + .description("The URL of the queue to act upon") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + + @Override + protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + return new AmazonSQSClient(credentials, config); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java new file mode 100644 index 0000000..65e020d --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sqs; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; + +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; + +@SupportsBatching +@SeeAlso({GetSQS.class, PutSQS.class}) +@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"}) +@CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue") +public class DeleteSQS extends AbstractSQSProcessor { + + public static final PropertyDescriptor RECEIPT_HANDLE = new PropertyDescriptor.Builder() + .name("Receipt Handle") + .description("The identifier that specifies the receipt of the message") + .expressionLanguageSupported(true) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("${sqs.receipt.handle}") + .build(); + + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( + Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT)); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + List<FlowFile> flowFiles = session.get(1); + if (flowFiles.isEmpty()) { + return; + } + + final FlowFile firstFlowFile = flowFiles.get(0); + final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue(); + + final AmazonSQSClient client = getClient(); + final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(); + request.setQueueUrl(queueUrl); + + final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size()); + + for (final FlowFile flowFile : flowFiles) { + final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(); + entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue()); + entries.add(entry); + } + + request.setEntries(entries); + + try { + client.deleteMessageBatch(request); + getLogger().info("Successfully deleted {} objects from SQS", new Object[]{flowFiles.size()}); + session.transfer(flowFiles, REL_SUCCESS); + } catch (final Exception e) { + getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[]{flowFiles.size(), e}); + session.transfer(flowFiles, REL_FAILURE); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java new file mode 100644 index 0000000..7c2dd2d --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sqs; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; + +@SupportsBatching +@Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"}) +@SeeAlso({PutSQS.class, DeleteSQS.class}) +@CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue") +@WritesAttributes({ + @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the message"), + @WritesAttribute(attribute = "hash.algorithm", description = "MD5"), + @WritesAttribute(attribute = "sqs.message.id", description = "The unique identifier of the SQS message"), + @WritesAttribute(attribute = "sqs.receipt.handle", description = "The SQS Receipt Handle that is to be used to delete the message from the queue") +}) +public class GetSQS extends AbstractSQSProcessor { + + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("The Character Set that should be used to encode the textual content of the SQS message") + .required(true) + .defaultValue("UTF-8") + .allowableValues(Charset.availableCharsets().keySet().toArray(new String[0])) + .build(); + + public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() + .name("Auto Delete Messages") + .description("Specifies whether the messages should be automatically deleted by the processors once they have been received.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder() + .name("Visibility Timeout") + .description("The amount of time after a message is received but not deleted that the message is hidden from other consumers") + .expressionLanguageSupported(false) + .required(true) + .defaultValue("15 mins") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of messages to send in a single network request") + .required(true) + .addValidator(StandardValidators.createLongValidator(1L, 10L, true)) + .defaultValue("10") + .build(); + + public static final PropertyDescriptor STATIC_QUEUE_URL = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(QUEUE_URL) + .expressionLanguageSupported(false) + .build(); + + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( + Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT)); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final String queueUrl = context.getProperty(STATIC_QUEUE_URL).getValue(); + + final AmazonSQSClient client = getClient(); + + final ReceiveMessageRequest request = new ReceiveMessageRequest(); + request.setAttributeNames(Collections.singleton("All")); + request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger()); + request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue()); + request.setQueueUrl(queueUrl); + + final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + + final ReceiveMessageResult result; + try { + result = client.receiveMessage(request); + } catch (final Exception e) { + getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[]{e}); + context.yield(); + return; + } + + final List<Message> messages = result.getMessages(); + if (messages.isEmpty()) { + context.yield(); + return; + } + + final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean(); + + for (final Message message : messages) { + FlowFile flowFile = session.create(); + + final Map<String, String> attributes = new HashMap<>(); + for (final Map.Entry<String, String> entry : message.getAttributes().entrySet()) { + attributes.put("sqs." + entry.getKey(), entry.getValue()); + } + + for (final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) { + attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue()); + } + + attributes.put("hash.value", message.getMD5OfBody()); + attributes.put("hash.algorithm", "md5"); + attributes.put("sqs.message.id", message.getMessageId()); + attributes.put("sqs.receipt.handle", message.getReceiptHandle()); + + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(message.getBody().getBytes(charset)); + } + }); + + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().receive(flowFile, queueUrl); + + getLogger().info("Successfully received {} from Amazon SQS", new Object[]{flowFile}); + } + + if (autoDelete) { + // If we want to auto-delete messages, we must fist commit the session to ensure that the data + // is persisted in NiFi's repositories. + session.commit(); + + final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest(); + deleteRequest.setQueueUrl(queueUrl); + final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>(); + for (final Message message : messages) { + final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(); + entry.setId(message.getMessageId()); + entry.setReceiptHandle(message.getReceiptHandle()); + deleteRequestEntries.add(entry); + } + + deleteRequest.setEntries(deleteRequestEntries); + + try { + client.deleteMessageBatch(deleteRequest); + } catch (final Exception e) { + getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages" + + " may be duplicated. Reason for deletion failure: {}", new Object[]{messages.size(), e}); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java new file mode 100644 index 0000000..3961f32 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sqs; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; + +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.SendMessageBatchRequest; +import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; + +@SupportsBatching +@Tags({"Amazon", "AWS", "SQS", "Queue", "Put", "Publish"}) +@SeeAlso({GetSQS.class, DeleteSQS.class}) +@CapabilityDescription("Publishes a message to an Amazon Simple Queuing Service Queue") +@DynamicProperty(name = "The name of a Message Attribute to add to the message", value = "The value of the Message Attribute", + description = "Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of " + + "the Message Attribute and value will become the value of the Message Attribute", supportsExpressionLanguage = true) +public class PutSQS extends AbstractSQSProcessor { + + public static final PropertyDescriptor DELAY = new PropertyDescriptor.Builder() + .name("Delay") + .description("The amount of time to delay the message before it becomes available to consumers") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 secs") + .build(); + + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( + Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, REGION, DELAY, TIMEOUT)); + + private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .expressionLanguageSupported(true) + .required(false) + .dynamic(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + } + + @OnScheduled + public void setup(final ProcessContext context) { + userDefinedProperties = new ArrayList<>(); + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (descriptor.isDynamic()) { + userDefinedProperties.add(descriptor); + } + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + final AmazonSQSClient client = getClient(); + final SendMessageBatchRequest request = new SendMessageBatchRequest(); + final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue(); + request.setQueueUrl(queueUrl); + + final Set<SendMessageBatchRequestEntry> entries = new HashSet<>(); + + final SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry(); + entry.setId(flowFile.getAttribute("uuid")); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + session.exportTo(flowFile, baos); + final String flowFileContent = baos.toString(); + entry.setMessageBody(flowFileContent); + + final Map<String, MessageAttributeValue> messageAttributes = new HashMap<>(); + + for (final PropertyDescriptor descriptor : userDefinedProperties) { + final MessageAttributeValue mav = new MessageAttributeValue(); + mav.setDataType("String"); + mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue()); + messageAttributes.put(descriptor.getName(), mav); + } + + entry.setMessageAttributes(messageAttributes); + entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue()); + entries.add(entry); + + request.setEntries(entries); + + try { + client.sendMessageBatch(request); + } catch (final Exception e) { + getLogger().error("Failed to send messages to Amazon SQS due to {}; routing to failure", new Object[]{e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + getLogger().info("Successfully published message to Amazon SQS for {}", new Object[]{flowFile}); + session.transfer(flowFile, REL_SUCCESS); + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, queueUrl, transmissionMillis); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..4f2405c --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.aws.s3.FetchS3Object +org.apache.nifi.processors.aws.s3.PutS3Object +org.apache.nifi.processors.aws.sns.PutSNS +org.apache.nifi.processors.aws.sqs.GetSQS +org.apache.nifi.processors.aws.sqs.PutSQS +org.apache.nifi.processors.aws.sqs.DeleteSQS http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java new file mode 100644 index 0000000..0321514 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") +public class TestFetchS3Object { + + private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + + @Test + public void testGet() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object()); + runner.setProperty(FetchS3Object.BUCKET, "anonymous-test-bucket-00000000"); + runner.setProperty(FetchS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE); + runner.setProperty(FetchS3Object.KEY, "folder/1.txt"); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("start", "0"); + + runner.enqueue(new byte[0], attrs); + runner.run(1); + + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1); + final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS); + final MockFlowFile out = ffs.iterator().next(); + + final byte[] expectedBytes = Files.readAllBytes(Paths.get("src/test/resources/hello.txt")); + out.assertContentEquals(new String(expectedBytes)); + for (final Map.Entry<String, String> entry : out.getAttributes().entrySet()) { + System.out.println(entry.getKey() + " : " + entry.getValue()); + } + } + +}