[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-197294536 @mans2singh Looks great, thanks for getting those changes together and the contribution. I'll be merging this into master shortly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/213 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user mans2singh commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-197183816 @aldrin @mattyb149 Thanks for your pointers and samples for using data validator. I've made the changes and updated test cases. Please let me know if there is anything else required. Thanks again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-197151921 @mattyb149 has it. @mans2singh the updates look great. Concerning the validator, how does 6bed3bcaad61c516eacf268400cdd03c0cd58dae look to you? Otherwise, I think we can send this one on its way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user mattyb149 commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-197125258 @mans2singh I think Aldrin was referring to DATA_SIZE_VALIDATOR in org.apache.nifi.processor.util.StandardValidators. If you need custom min/max size (in bytes), you can use StandardValidators.createDataSizeBoundsValidator(minBytesInclusive, maxBytesInclusive) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user mans2singh commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-197122905 @aldrin - I've moved the check for buffer size and rebased as you had recommended. I could not find the data size validator that you have mentioned though. Please let me know if you have any other recommendation. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-197000303 Looking good going to AWS endpoints and services. Good stuff. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-196956296 @mans2singh Overall, the changes look good and appreciate the inclusion of some additional tests. Going to do some functional testing but think we can get this merged in. There are, unfortunately, a lot of conflicts, so for some of the minor points mentioned in the review, would you also be able to please get this rebased so we are good to go for a merge? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r56212999 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_HOST,PROXY_HOST_PORT)); + +/** + * Max buffer size 1000kb + */ +public static final int MAX_MESSAGE_SIZE = 1000 * 1024; + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@Override +public void onTrigger(final ProcessContext context, final ProcessSession session) { +FlowFile flowFileCandidate = session.get(); +if ( flowFileCandidate == null ) +return; + +
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r56212702 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_HOST,PROXY_HOST_PORT)); + +/** + * Max buffer size 1000kb + */ +public static final int MAX_MESSAGE_SIZE = 1000 * 1024; + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@Override +public void onTrigger(final ProcessContext context, final ProcessSession session) { +FlowFile flowFileCandidate = session.get(); +if ( flowFileCandidate == null ) +return; + +
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r56212500 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_HOST,PROXY_HOST_PORT)); + +/** + * Max buffer size 1000kb --- End diff -- Now that we have a buffer, should likely call this message size --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r56212428 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; + +/** + * This class provides processor the base class for kinesis firehose + */ +public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor { + +public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder() +.name("Amazon Kinesis Firehose Delivery Stream Name") +.description("The name of kinesis firehose delivery stream") +.expressionLanguageSupported(false) +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("Batch Size") +.description("Batch size for messages (1-500).") +.defaultValue("250") +.required(false) +.addValidator(StandardValidators.createLongValidator(1, 500, true)) +.sensitive(false) +.build(); + +public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder() +.name("Max message buffer size (MB)") +.description("Max message buffer size (1-50) MB") +.defaultValue("1") +.required(false) +.addValidator(StandardValidators.createLongValidator(1, 50, true)) --- End diff -- Likely want to prefer utilization of the DATA_SIZE_VALIDATOR. Not sure we need to strictly cap this as we are guiding users toward an appropriate path and if they have conditions where they wish to go above the maximum (as well as supporting hardware) that should be fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r56201575 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_HOST,PROXY_HOST_PORT)); + +/** + * Max buffer size 1000kb + */ +public static final int MAX_MESSAGE_SIZE = 1000 * 1024; + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@Override +public void onTrigger(final ProcessContext context, final ProcessSession session) { +FlowFile flowFileCandidate = session.get(); +if ( flowFileCandidate == null ) +return; + +
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user mans2singh commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-196646278 @aldrin - I've added max buffer size limit check while getting flow files from the session as you had recommended. I've also kept the batch size limit in place. I've added integration tests to check that the messages are sent once the max buffer size is exceeded. Can you please review the code and let me know if it meets your design ? Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-196476035 Hey @mans2singh, Just wanted to see how the progress was going. We are coming up on release time for 0.6.0 and need to start wrapping things up. If this is something we can bring to a close in the next day or, great, if not, can we push this off for 0.7.0? Thanks and let me know! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-195410447 Nope, just send it off to Kinesis and then transfer the files to the appropriate relationship afterward. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user mans2singh commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-195409880 @apiri - To make things simple we get rid of the batch size and we just get one flow file at a time on each invocation of the onTrigger. We store the flow files in memory until the max size limit or no more are coming in and then send the batch held in memory to kinesis firehose. Do we have to do anything to "close out" the collected batch ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-195367460 @mans2singh I think your logic for collecting the flowfiles should probably just continuously grab individual files at a time. This continues until one of a couple scenarios is reached: max buffer size is met or exceeded (based on the the size of each flowfile that has been collected), there are no more files coming into the processor (one or more flowfiles have been collected but we have not yet eclipsed the max buffer size). In either case, we would close out that collected batch of files and send them on their way much as you had before. In terms of the 250MB by default, this is from the default batch size of 250MB and, the worst case scenario, each file is 1MB in size. While each of these files is converted to a byte array for sending, they are continuously sitting on the heap. In the event multiple instances of this processor are running, we could quickly consume some big chunks of the heap. What I am proposing is to either get rid of the batch size (that property no longer exists) or make this a secondary consideration where we try to receive a certain batch size but first ensure we do not exceed the configured buffer and second, do not exceed the batch size, with similar semantics to the above scenarios in terms of when those batches are sent on their way plus, when a certain batch size has been reached. Does that clarify a bit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user mans2singh commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-195300721 @apiri - Just wanted to clarify - We will grab a batch of flow files and check that the flow files combined size threshold is reached. Then we send the batch to the successful relation, and if not we send the flow files back to the work queue with the transfer method you mentioned. This might cause additional delays if the threshold set by the user is not reached for some time because the flow files are too small or the rate at which flow files are being input is low. No worries on the time - I know you guys are busy and I really appreciate your time/advice in helping me work on this project. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-195173926 @mans2singh Yep. I think we can probably replace the number of items in a batch size (or make this secondary to the buffer size). The idea would be to continuously grab FlowFiles until we've reached this threshold and then when the next item arrives that would put us over that limit, we transfer it back to the incoming queue session.transfer(flowFile) **note the lack of relationship**. We can then create the batch much the same way you have done now. Does that make sense? Thanks for your work on this (and all the other great AWS stuff, very popular extensions), and apologies for the lag on following up again on this issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user mans2singh commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-195172976 @apiri Your recommendation for using buffer size is great. So, should we add another property - total batch bytes, and then send them in batches of that size ? If you have any other suggestion/pointers on how this can be implemented, please let me know and I will work on it. Thanks again for your time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-194975737 @mans2singh I had a chance to sit down and revisit this. Overall, it looks good and I was able to test a flow successfully putting to a Kinesis Firehose which aggregated and dumped to S3. One thing that was mentioned prior in my initial review that we still need to cover is that of how we are handling batching. I do think we need to handle that in a more constrained fashion given that file sizes could vary widely. With how the processor is currently configured, it could hold up to 250MB in memory, by default. Instead, what would your thoughts be on converting this to a buffer size property. If people want batching, they can specify a given memory size (perhaps something like 1 MB by default) and then we can wait until that threshold is hit or no more input flowfiles are available, at which point they are sent off in a batch. If batching is not desired, they can either empty the buffer property or specify 0 bytes. Thoughts on this approach? Ultimately, we are trying to avoid people from incidentally causing issues with heap exhaustion. With the prescribed approach here, people can get as aggressive as they wish with batching and have a finitely constrained amount of space per instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose
Github user mans2singh commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-188062568 Corrected title of pull request. Please let me know if there is anything else required. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---