[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-19 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-197294536
  
@mans2singh Looks great, thanks for getting those changes together and the 
contribution.  I'll be merging this into master shortly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/213


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-16 Thread mans2singh
Github user mans2singh commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-197183816
  
@aldrin @mattyb149 Thanks for your pointers and samples for using data 
validator.  I've made the changes and updated test cases.  Please let me know 
if there is anything else required.  Thanks again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-15 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-197151921
  
@mattyb149 has it.

@mans2singh the updates look great.  Concerning the validator, how does 
6bed3bcaad61c516eacf268400cdd03c0cd58dae look to you?

Otherwise, I think we can send this one on its way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-15 Thread mattyb149
Github user mattyb149 commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-197125258
  
@mans2singh I think Aldrin was referring to DATA_SIZE_VALIDATOR in 
org.apache.nifi.processor.util.StandardValidators. If you need custom min/max 
size (in bytes), you can use 
StandardValidators.createDataSizeBoundsValidator(minBytesInclusive, 
maxBytesInclusive) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-15 Thread mans2singh
Github user mans2singh commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-197122905
  
@aldrin - I've moved the check for buffer size and rebased as you had 
recommended.  I could not find the data size validator that you have mentioned 
though. Please let me know if you have any other recommendation.  Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-15 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-197000303
  
Looking good going to AWS endpoints and services.  Good stuff.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-15 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-196956296
  
@mans2singh Overall, the changes look good and appreciate the inclusion of 
some additional tests.  Going to do some functional testing but think we can 
get this merged in.  There are, unfortunately, a lot of conflicts, so for some 
of the minor points mentioned in the review, would you also be able to please 
get this rebased so we are good to go for a merge?

Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-15 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r56212999
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
+import 
com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
+import com.amazonaws.services.kinesisfirehose.model.Record;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
+@CapabilityDescription("Sends the contents to a specified Amazon Kinesis 
Firehose. "
++ "In order to send data to firehose, the firehose delivery stream 
name has to be specified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", 
description = "Error message on posting message to AWS Kinesis Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", 
description = "Error code for the message when posting to AWS Kinesis 
Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", 
description = "Record id of the message posted to Kinesis Firehose")})
+public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
+
+/**
+ * Kinesis put record response error message
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = 
"aws.kinesis.firehose.error.message";
+
+/**
+ * Kinesis put record response error code
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = 
"aws.kinesis.firehose.error.code";
+
+/**
+ * Kinesis put record response record id
+ */
+public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = 
"aws.kinesis.firehose.record.id";
+
+public static final List properties = 
Collections.unmodifiableList(
+Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
+  PROXY_HOST,PROXY_HOST_PORT));
+
+/**
+ * Max buffer size 1000kb
+ */
+public static final int MAX_MESSAGE_SIZE = 1000 * 1024;
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return properties;
+}
+
+@Override
+public void onTrigger(final ProcessContext context, final 
ProcessSession session) {
+FlowFile flowFileCandidate = session.get();
+if ( flowFileCandidate == null )
+return;
+
+  

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-15 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r56212702
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
+import 
com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
+import com.amazonaws.services.kinesisfirehose.model.Record;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
+@CapabilityDescription("Sends the contents to a specified Amazon Kinesis 
Firehose. "
++ "In order to send data to firehose, the firehose delivery stream 
name has to be specified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", 
description = "Error message on posting message to AWS Kinesis Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", 
description = "Error code for the message when posting to AWS Kinesis 
Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", 
description = "Record id of the message posted to Kinesis Firehose")})
+public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
+
+/**
+ * Kinesis put record response error message
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = 
"aws.kinesis.firehose.error.message";
+
+/**
+ * Kinesis put record response error code
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = 
"aws.kinesis.firehose.error.code";
+
+/**
+ * Kinesis put record response record id
+ */
+public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = 
"aws.kinesis.firehose.record.id";
+
+public static final List properties = 
Collections.unmodifiableList(
+Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
+  PROXY_HOST,PROXY_HOST_PORT));
+
+/**
+ * Max buffer size 1000kb
+ */
+public static final int MAX_MESSAGE_SIZE = 1000 * 1024;
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return properties;
+}
+
+@Override
+public void onTrigger(final ProcessContext context, final 
ProcessSession session) {
+FlowFile flowFileCandidate = session.get();
+if ( flowFileCandidate == null )
+return;
+
+  

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-15 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r56212500
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
+import 
com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
+import com.amazonaws.services.kinesisfirehose.model.Record;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
+@CapabilityDescription("Sends the contents to a specified Amazon Kinesis 
Firehose. "
++ "In order to send data to firehose, the firehose delivery stream 
name has to be specified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", 
description = "Error message on posting message to AWS Kinesis Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", 
description = "Error code for the message when posting to AWS Kinesis 
Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", 
description = "Record id of the message posted to Kinesis Firehose")})
+public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
+
+/**
+ * Kinesis put record response error message
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = 
"aws.kinesis.firehose.error.message";
+
+/**
+ * Kinesis put record response error code
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = 
"aws.kinesis.firehose.error.code";
+
+/**
+ * Kinesis put record response record id
+ */
+public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = 
"aws.kinesis.firehose.record.id";
+
+public static final List properties = 
Collections.unmodifiableList(
+Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
+  PROXY_HOST,PROXY_HOST_PORT));
+
+/**
+ * Max buffer size 1000kb
--- End diff --

Now that we have a buffer, should likely call this message size


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-15 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r56212428
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+
+/**
+ * This class provides processor the base class for kinesis firehose
+ */
+public abstract class AbstractKinesisFirehoseProcessor extends 
AbstractAWSCredentialsProviderProcessor {
+
+public static final PropertyDescriptor 
KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
+.name("Amazon Kinesis Firehose Delivery Stream Name")
+.description("The name of kinesis firehose delivery stream")
+.expressionLanguageSupported(false)
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("Batch Size")
+.description("Batch size for messages (1-500).")
+.defaultValue("250")
+.required(false)
+.addValidator(StandardValidators.createLongValidator(1, 500, 
true))
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = 
new PropertyDescriptor.Builder()
+.name("Max message buffer size (MB)")
+.description("Max message buffer size (1-50) MB")
+.defaultValue("1")
+.required(false)
+.addValidator(StandardValidators.createLongValidator(1, 50, 
true))
--- End diff --

Likely want to prefer utilization of the DATA_SIZE_VALIDATOR.  Not sure we 
need to strictly cap this as we are guiding users toward an appropriate path 
and if they have conditions where they wish to go above the maximum (as well as 
supporting hardware) that should be fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-15 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r56201575
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
 ---
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
+import 
com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
+import com.amazonaws.services.kinesisfirehose.model.Record;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
+@CapabilityDescription("Sends the contents to a specified Amazon Kinesis 
Firehose. "
++ "In order to send data to firehose, the firehose delivery stream 
name has to be specified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", 
description = "Error message on posting message to AWS Kinesis Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", 
description = "Error code for the message when posting to AWS Kinesis 
Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", 
description = "Record id of the message posted to Kinesis Firehose")})
+public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
+
+/**
+ * Kinesis put record response error message
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = 
"aws.kinesis.firehose.error.message";
+
+/**
+ * Kinesis put record response error code
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = 
"aws.kinesis.firehose.error.code";
+
+/**
+ * Kinesis put record response record id
+ */
+public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = 
"aws.kinesis.firehose.record.id";
+
+public static final List properties = 
Collections.unmodifiableList(
+Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
+  PROXY_HOST,PROXY_HOST_PORT));
+
+/**
+ * Max buffer size 1000kb
+ */
+public static final int MAX_MESSAGE_SIZE = 1000 * 1024;
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return properties;
+}
+
+@Override
+public void onTrigger(final ProcessContext context, final 
ProcessSession session) {
+FlowFile flowFileCandidate = session.get();
+if ( flowFileCandidate == null )
+return;
+
+  

[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-14 Thread mans2singh
Github user mans2singh commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-196646278
  
@aldrin - I've added max buffer size limit check while getting flow files 
from the session as you had recommended.  I've also kept the batch size limit 
in place. I've added integration tests to check that the messages are sent once 
the max buffer size is exceeded. Can you please review the code and let me know 
if it meets your design ?
Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-14 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-196476035
  
Hey @mans2singh,

Just wanted to see how the progress was going.  We are coming up on release 
time for 0.6.0 and need to start wrapping things up.  If this is something we 
can bring to a close in the next day or, great, if not, can we push this off 
for 0.7.0?

Thanks and let me know! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-11 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-195410447
  
Nope, just send it off to Kinesis and then transfer the files to the 
appropriate relationship afterward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-11 Thread mans2singh
Github user mans2singh commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-195409880
  
@apiri  - To make things simple we get rid of the batch size and we just 
get one flow file at a time on each invocation of the onTrigger.  We store the 
flow files in memory until the max size limit or no more are coming in and then 
send the batch held in memory to kinesis firehose.  Do we have to do anything 
to "close out" the collected batch ?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-11 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-195367460
  
@mans2singh I think your logic for collecting the flowfiles should probably 
just continuously grab individual files at a time.  This continues until one of 
a couple scenarios is reached:  max buffer size is met or exceeded (based on 
the the size of each flowfile that has been collected), there are no more files 
coming into the processor (one or more flowfiles have been collected but we 
have not yet eclipsed the max buffer size).  In either case, we would close out 
that collected batch of files and send them on their way much as you had before.

In terms of the 250MB by default, this is from the default batch size of 
250MB and, the worst case scenario, each file is 1MB in size. While each of 
these files is converted to a byte array for sending, they are continuously 
sitting on the heap.  In the event multiple instances of this processor are 
running, we could quickly consume some big chunks of the heap.  What I am 
proposing is to either get rid of the batch size (that property no longer 
exists) or make this a secondary consideration where we try to receive a 
certain batch size but first ensure we do not exceed the configured buffer and 
second, do not exceed the batch size, with similar semantics to the above 
scenarios in terms of when those batches are sent on their way plus, when a 
certain batch size has been reached.

Does that clarify a bit? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-11 Thread mans2singh
Github user mans2singh commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-195300721
  
@apiri - Just wanted to clarify - We will grab a batch of flow files and 
check that the flow files combined size threshold is reached.  Then we send the 
batch to the successful relation, and if not we send the flow files back to the 
work queue with the transfer method you mentioned.  This might cause additional 
delays if the threshold set by the user is not reached for some time because 
the flow files are too small or the rate at which flow files are being input is 
low.
No worries on the time - I know you guys are busy and I really appreciate 
your time/advice in helping me work on this project.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-10 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-195173926
  
@mans2singh Yep.  I think we can probably replace the number of items in a 
batch size (or make this secondary to the buffer size).  The idea would be to 
continuously grab FlowFiles until we've reached this threshold and then when 
the next item arrives that would put us over that limit, we transfer it back to 
the incoming queue session.transfer(flowFile) **note the lack of 
relationship**.  We can then create the batch much the same way you have done 
now.  Does that make sense?

Thanks for your work on this (and all the other great AWS stuff, very 
popular extensions), and apologies for the lag on following up again on this 
issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-10 Thread mans2singh
Github user mans2singh commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-195172976
  
@apiri Your recommendation for using buffer size is great. So, should we 
add another property - total batch bytes, and then send them in batches of that 
size ? If you have any other suggestion/pointers on how this can be 
implemented, please let me know and I will work on it.  Thanks again for your 
time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-03-10 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-194975737
  
@mans2singh I had a chance to sit down and revisit this.  Overall, it looks 
good and I was able to test a flow successfully putting to a Kinesis Firehose 
which aggregated and dumped to S3.

One thing that was mentioned prior in my initial review that we still need 
to cover is that of how we are handling batching.  I do think we need to handle 
that in a more constrained fashion given that file sizes could vary widely.  
With how the processor is currently configured, it could hold up to 250MB in 
memory, by default.  Instead, what would your thoughts be on converting this to 
a buffer size property.  If people want batching, they can specify a given 
memory size (perhaps something like 1 MB by default) and then we can wait until 
that threshold is hit or no more input flowfiles are available, at which point 
they are sent off in a batch.  If batching is not desired, they can either 
empty the buffer property or specify 0 bytes.

Thoughts on this approach?  Ultimately, we are trying to avoid people from 
incidentally causing issues with heap exhaustion.  With the prescribed approach 
here, people can get as aggressive as they wish with batching and have a 
finitely constrained amount of space per instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1495 - AWS Kinesis Firehose

2016-02-23 Thread mans2singh
Github user mans2singh commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-188062568
  
Corrected title of pull request.  Please let me know if there is anything 
else required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---