[jira] [Commented] (SPARK-18620) Spark Streaming + Kinesis : Receiver MaxRate is violated

2018-05-10 Thread bruce_zhao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16470069#comment-16470069
 ] 

bruce_zhao commented on SPARK-18620:


This PR makes the input rate flat, but the limit of get-records is still out of 
control. 

In the onStart(), it will initialize a configuration for KCL worker. 

val baseClientLibConfiguration = new KinesisClientLibConfiguration(
checkpointAppName,
streamName,
kinesisProvider,
dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
workerId)

In the KCL library, it will use default value DEFAULT_MAX_RECORDS(10,000) for 
getRecords. 

As Kinesis only supports a data read rate of 2 MB per second per shard, it will 
be easily to get exception ProvisionedThroughputExceededException, especially 
when we restart the application after a long stop. 

 

 

 

> Spark Streaming + Kinesis : Receiver MaxRate is violated
> 
>
> Key: SPARK-18620
> URL: https://issues.apache.org/jira/browse/SPARK-18620
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
>Reporter: david przybill
>Assignee: Takeshi Yamamuro
>Priority: Minor
>  Labels: kinesis
> Fix For: 2.2.0
>
> Attachments: Apply_limit in_spark_with_my_patch.png, Apply_limit 
> in_vanilla_spark.png, Apply_no_limit.png
>
>
> I am calling spark-submit passing maxRate, I have a single kinesis receiver, 
> and batches of 1s
> spark-submit  --conf spark.streaming.receiver.maxRate=10 
> however a single batch can greatly exceed the stablished maxRate. i.e: Im 
> getting 300 records.
> it looks like Kinesis is completely ignoring the 
> spark.streaming.receiver.maxRate configuration.
> If you look inside KinesisReceiver.onStart, you see:
> val kinesisClientLibConfiguration =
>   new KinesisClientLibConfiguration(checkpointAppName, streamName, 
> awsCredProvider, workerId)
>   .withKinesisEndpoint(endpointUrl)
>   .withInitialPositionInStream(initialPositionInStream)
>   .withTaskBackoffTimeMillis(500)
>   .withRegionName(regionName)
> This constructor ends up calling another constructor which has a lot of 
> default values for the configuration. One of those values is 
> DEFAULT_MAX_RECORDS which is constantly set to 10,000 records.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18620) Spark Streaming + Kinesis : Receiver MaxRate is violated

2016-12-01 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714116#comment-15714116
 ] 

Apache Spark commented on SPARK-18620:
--

User 'maropu' has created a pull request for this issue:
https://github.com/apache/spark/pull/16114

> Spark Streaming + Kinesis : Receiver MaxRate is violated
> 
>
> Key: SPARK-18620
> URL: https://issues.apache.org/jira/browse/SPARK-18620
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
>Reporter: david przybill
>Priority: Minor
>  Labels: kinesis
> Attachments: Apply_limit in_spark_with_my_patch.png, Apply_limit 
> in_vanilla_spark.png, Apply_no_limit.png
>
>
> I am calling spark-submit passing maxRate, I have a single kinesis receiver, 
> and batches of 1s
> spark-submit  --conf spark.streaming.receiver.maxRate=10 
> however a single batch can greatly exceed the stablished maxRate. i.e: Im 
> getting 300 records.
> it looks like Kinesis is completely ignoring the 
> spark.streaming.receiver.maxRate configuration.
> If you look inside KinesisReceiver.onStart, you see:
> val kinesisClientLibConfiguration =
>   new KinesisClientLibConfiguration(checkpointAppName, streamName, 
> awsCredProvider, workerId)
>   .withKinesisEndpoint(endpointUrl)
>   .withInitialPositionInStream(initialPositionInStream)
>   .withTaskBackoffTimeMillis(500)
>   .withRegionName(regionName)
> This constructor ends up calling another constructor which has a lot of 
> default values for the configuration. One of those values is 
> DEFAULT_MAX_RECORDS which is constantly set to 10,000 records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18620) Spark Streaming + Kinesis : Receiver MaxRate is violated

2016-12-01 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15713894#comment-15713894
 ] 

Takeshi Yamamuro commented on SPARK-18620:
--

yea, I'll make a pr in a day

> Spark Streaming + Kinesis : Receiver MaxRate is violated
> 
>
> Key: SPARK-18620
> URL: https://issues.apache.org/jira/browse/SPARK-18620
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
>Reporter: david przybill
>Priority: Minor
>  Labels: kinesis
> Attachments: Apply_limit in_spark_with_my_patch.png, Apply_limit 
> in_vanilla_spark.png, Apply_no_limit.png
>
>
> I am calling spark-submit passing maxRate, I have a single kinesis receiver, 
> and batches of 1s
> spark-submit  --conf spark.streaming.receiver.maxRate=10 
> however a single batch can greatly exceed the stablished maxRate. i.e: Im 
> getting 300 records.
> it looks like Kinesis is completely ignoring the 
> spark.streaming.receiver.maxRate configuration.
> If you look inside KinesisReceiver.onStart, you see:
> val kinesisClientLibConfiguration =
>   new KinesisClientLibConfiguration(checkpointAppName, streamName, 
> awsCredProvider, workerId)
>   .withKinesisEndpoint(endpointUrl)
>   .withInitialPositionInStream(initialPositionInStream)
>   .withTaskBackoffTimeMillis(500)
>   .withRegionName(regionName)
> This constructor ends up calling another constructor which has a lot of 
> default values for the configuration. One of those values is 
> DEFAULT_MAX_RECORDS which is constantly set to 10,000 records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18620) Spark Streaming + Kinesis : Receiver MaxRate is violated

2016-12-01 Thread david przybill (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15713889#comment-15713889
 ] 

david przybill commented on SPARK-18620:


Looks good to me.
Thanks for the prompt answer

> Spark Streaming + Kinesis : Receiver MaxRate is violated
> 
>
> Key: SPARK-18620
> URL: https://issues.apache.org/jira/browse/SPARK-18620
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
>Reporter: david przybill
>Priority: Minor
>  Labels: kinesis
> Attachments: Apply_limit in_spark_with_my_patch.png, Apply_limit 
> in_vanilla_spark.png, Apply_no_limit.png
>
>
> I am calling spark-submit passing maxRate, I have a single kinesis receiver, 
> and batches of 1s
> spark-submit  --conf spark.streaming.receiver.maxRate=10 
> however a single batch can greatly exceed the stablished maxRate. i.e: Im 
> getting 300 records.
> it looks like Kinesis is completely ignoring the 
> spark.streaming.receiver.maxRate configuration.
> If you look inside KinesisReceiver.onStart, you see:
> val kinesisClientLibConfiguration =
>   new KinesisClientLibConfiguration(checkpointAppName, streamName, 
> awsCredProvider, workerId)
>   .withKinesisEndpoint(endpointUrl)
>   .withInitialPositionInStream(initialPositionInStream)
>   .withTaskBackoffTimeMillis(500)
>   .withRegionName(regionName)
> This constructor ends up calling another constructor which has a lot of 
> default values for the configuration. One of those values is 
> DEFAULT_MAX_RECORDS which is constantly set to 10,000 records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18620) Spark Streaming + Kinesis : Receiver MaxRate is violated

2016-12-01 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15712392#comment-15712392
 ] 

Takeshi Yamamuro commented on SPARK-18620:
--

I tried to fix this issue: 
https://github.com/apache/spark/compare/master...maropu:SPARK-18620.
Also, I did some tests with three different conditions: no limit, applying 
limits on vanilla spark, and applying limits on spark with my patch (See 
attached above).
Obviously, my patch could limit the number of input records more naturally.

> Spark Streaming + Kinesis : Receiver MaxRate is violated
> 
>
> Key: SPARK-18620
> URL: https://issues.apache.org/jira/browse/SPARK-18620
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
>Reporter: david przybill
>Priority: Minor
>  Labels: kinesis
> Attachments: Apply_limit in_spark_with_my_patch.png, Apply_limit 
> in_vanilla_spark.png, Apply_no_limit.png
>
>
> I am calling spark-submit passing maxRate, I have a single kinesis receiver, 
> and batches of 1s
> spark-submit  --conf spark.streaming.receiver.maxRate=10 
> however a single batch can greatly exceed the stablished maxRate. i.e: Im 
> getting 300 records.
> it looks like Kinesis is completely ignoring the 
> spark.streaming.receiver.maxRate configuration.
> If you look inside KinesisReceiver.onStart, you see:
> val kinesisClientLibConfiguration =
>   new KinesisClientLibConfiguration(checkpointAppName, streamName, 
> awsCredProvider, workerId)
>   .withKinesisEndpoint(endpointUrl)
>   .withInitialPositionInStream(initialPositionInStream)
>   .withTaskBackoffTimeMillis(500)
>   .withRegionName(regionName)
> This constructor ends up calling another constructor which has a lot of 
> default values for the configuration. One of those values is 
> DEFAULT_MAX_RECORDS which is constantly set to 10,000 records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18620) Spark Streaming + Kinesis : Receiver MaxRate is violated

2016-11-30 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711171#comment-15711171
 ] 

Takeshi Yamamuro commented on SPARK-18620:
--

I quickly checked and I found that that's not enough to set max records in 
Kinesis workers because
the kinesis workers cannot limit the number of aggregate messages 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#d0e5184).
For example, if we set 10 to the number of max records in workers and a 
producer aggregates two records into one message,
it seems kinesis workers actually 20 records per callback function called.
My hunch is that we need to control #records to push them into a receiver in 
KinesisRecordProcessor#processRecords(https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala#L68).

> Spark Streaming + Kinesis : Receiver MaxRate is violated
> 
>
> Key: SPARK-18620
> URL: https://issues.apache.org/jira/browse/SPARK-18620
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.0.2
>Reporter: david przybill
>Priority: Minor
>  Labels: kinesis
>
> I am calling spark-submit passing maxRate, I have a single kinesis receiver, 
> and batches of 1s
> spark-submit  --conf spark.streaming.receiver.maxRate=10 
> however a single batch can greatly exceed the stablished maxRate. i.e: Im 
> getting 300 records.
> it looks like Kinesis is completely ignoring the 
> spark.streaming.receiver.maxRate configuration.
> If you look inside KinesisReceiver.onStart, you see:
> val kinesisClientLibConfiguration =
>   new KinesisClientLibConfiguration(checkpointAppName, streamName, 
> awsCredProvider, workerId)
>   .withKinesisEndpoint(endpointUrl)
>   .withInitialPositionInStream(initialPositionInStream)
>   .withTaskBackoffTimeMillis(500)
>   .withRegionName(regionName)
> This constructor ends up calling another constructor which has a lot of 
> default values for the configuration. One of those values is 
> DEFAULT_MAX_RECORDS which is constantly set to 10,000 records.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org