[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546448#comment-16546448
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6300
  
Nice feature, thanks a lot.

Merged this into the 1.6 and 1.7 branches


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
> Fix For: 1.6.0
>
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546430#comment-16546430
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6300


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543945#comment-16543945
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/6300
  
@StephanEwen I have run this on the following set up:
```
Number of shards on Kinesis stream: 384
Number of task slots: 384 / 192 / 96
Throughput achieved per shard (with adaptive reads) : 1.95 Mb/sec /  1.75 
Mb/sec / 1.6 Mb/sec
```



> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542797#comment-16542797
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6300
  
I like the idea of this - should make it much easier to use.
Have you run this code on some heavier data stream to validate that it 
works well in practice?

 If yes, I would be +1 to this



> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542428#comment-16542428
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/6300
  
The idea here is that `maxNumberOfRecordsPerFetch` should never be a value 
that gets records that exceeds the read limit  (2 Mb/sec) from the math here.
```
2 Mb/sec / (averageRecordSizeBytes * # reads/sec))
``` 
Atleast that's what the intent is - Let me know if that makes sense or if 
there is something amiss about the approach here. If there is a way in which 
`maxNumberOfRecordsPerFetch` is set such that the limit is exceeded, then yes, 
it will still be throttled by Kinesis.


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542423#comment-16542423
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202227834
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -134,6 +134,10 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The interval between each attempt to discover new shards. */
public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = 
"flink.shard.discovery.intervalmillis";
 
+   /** The config to turn on adaptive reads from a shard. */
+   public static final String SHARD_USE_ADAPTIVE_READS = 
"flink.shard.use.adaptive.reads";
--- End diff --

Changed.


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542424#comment-16542424
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202227845
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   protected int getAdaptiveMaxRecordsPerFetch(long 
averageRecordSizeBytes) {
+   int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch;
+   if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) {
+   adaptedMaxRecordsPerFetch = (int) 
(KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / 
fetchIntervalMillis));
+
+   // Ensure the value is not more than 1L
+   adaptedMaxRecordsPerFetch = 
adaptedMaxRecordsPerFetch <= 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ?
--- End diff --

Changed.


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542293#comment-16542293
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202201507
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   protected int getAdaptiveMaxRecordsPerFetch(long 
averageRecordSizeBytes) {
+   int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch;
+   if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) {
+   adaptedMaxRecordsPerFetch = (int) 
(KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / 
fetchIntervalMillis));
+
+   // Ensure the value is not more than 1L
+   adaptedMaxRecordsPerFetch = 
adaptedMaxRecordsPerFetch <= 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ?
--- End diff --

adaptedMaxRecordsPerFetch = Math.min(adaptedMaxRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542292#comment-16542292
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202199865
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -134,6 +134,10 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The interval between each attempt to discover new shards. */
public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = 
"flink.shard.discovery.intervalmillis";
 
+   /** The config to turn on adaptive reads from a shard. */
+   public static final String SHARD_USE_ADAPTIVE_READS = 
"flink.shard.use.adaptive.reads";
--- End diff --

[most Flink's feature 
flags](https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html) 
are named `xx.enabled`,  I'd suggest rename it to something like 
`flink.shard.adaptive.read.records.enabled`


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542132#comment-16542132
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r202156901
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   private int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) {
--- End diff --

Makes sense. Done.


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540708#comment-16540708
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/6300
  
@tzulitai can you please take a look - would be good if we can get this 
into v1.6.0


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540707#comment-16540707
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r201793459
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, 
int maxNumberOfRecords) thr
protected static List deaggregateRecords(List 
records, String startingHashKey, String endingHashKey) {
return UserRecord.deaggregate(records, new 
BigInteger(startingHashKey), new BigInteger(endingHashKey));
}
+
+   /**
+* Adapts the maxNumberOfRecordsPerFetch based on the current average 
record size
+* to optimize 2 Mb / sec read limits.
+*
+* @param averageRecordSizeBytes
+* @return adaptedMaxRecordsPerFetch
+*/
+
+   private int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) {
--- End diff --

Make this protected to allow for override? (Currently the shard consumer as 
a whole cannot be customized, but I think it should.)


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540706#comment-16540706
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/6300#discussion_r201792650
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -224,10 +232,19 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
+   long recordBatchSizeBytes = 0L;
+   long averageRecordSizeBytes = 0L;
+
for (UserRecord record : 
fetchedRecords) {
+   recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
+   if (useAdaptiveReads && 
fetchedRecords.size() != 0) {
--- End diff --

nit: && !isEmpty()


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539196#comment-16539196
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

Github user glaksh100 commented on the issue:

https://github.com/apache/flink/pull/6300
  
@fhueske @tzulitai @tweise  Can you please take a look when you have a 
chance?


> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
> limit|https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html]
>  on the getRecords call). 
> The idea here is to adapt the Kinesis connector to identify an average batch 
> size prior to making the getRecords call, so that the maxRecords parameter 
> can be appropriately tuned before making the call. 
> This feature can be behind a 
> [ConsumerConfigConstants|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java]
>  flag that defaults to false. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9692) Adapt maxRecords parameter in the getRecords call to optimize bytes read from Kinesis

2018-07-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16539095#comment-16539095
 ] 

ASF GitHub Bot commented on FLINK-9692:
---

GitHub user glaksh100 opened a pull request:

https://github.com/apache/flink/pull/6300

[FLINK-9692] Adaptive reads from Kinesis

## What is the purpose of the change

The purpose of this change is to provide an option to the Kinesis connector 
to optimize the amount of data (in bytes) read from Kinesis. The Kinesis 
connector currently has a [constant 
value](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213)
 set for `maxNumberOfRecordsPerFetch` that it can fetch from a single Kinesis 
`getRecords` call. However, in most realtime scenarios, the average size of the 
Kinesis record (in bytes) is not constant.
The idea here is to adapt the Kinesis connector to identify an average 
batch size prior to making the `getRecords` call, so that the 
`maxNumberOfRecordsPerFetch` parameter can be tuned to be as high as possible 
without exceeding  the 2 Mb/sec [per shard 
limit](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html).

This feature can be set using a 
[ConsumerConfigConstants](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java)
 flag that defaults to false. 


## Brief change log
 - With an initial value for `maxNumberofRecordsPerFetch`, the average size 
of a record returned in the batch of records is calculated
  - `maxNumberofRecordsPerFetch` is then set to ` 2 Mbps/ (average size of 
record/fetchIntervalMillis)` to maximize throughput in each `getRecords` call
  - This feature is turned on/off using a boolean  in 
`ConsumerConfigConstants` - `SHARD_USE_ADAPTIVE_READS`
 - `DEFAULT_SHARD_USE_ADAPTIVE_READS` is set to `false`

## Verifying this change
This change added tests and can be verified as follows:
  - Added a 
`testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads` test method 
to `ShardConsumerTest`

## Does this pull request potentially affect one of the following parts:
  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation
  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/glaksh100/flink 
FLINK-9692.adaptiveKinesisReads

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6300.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6300


commit 0c29017d6d1e98359d3093aaaecc54338324e57e
Author: Lakshmi Gururaja Rao 
Date:   2018-07-10T18:40:02Z

[FLINK-9692] Adaptive reads from Kinesis




> Adapt maxRecords parameter in the getRecords call to optimize bytes read from 
> Kinesis 
> --
>
> Key: FLINK-9692
> URL: https://issues.apache.org/jira/browse/FLINK-9692
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Lakshmi Rao
>Priority: Major
>  Labels: performance, pull-request-available
>
> The Kinesis connector currently has a [constant 
> value|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213]
>  set for maxRecords that it can fetch from a single Kinesis getRecords call. 
> However, in most realtime scenarios, the average size of the Kinesis record 
> (in bytes) changes depending on the situation i.e. you could be in a 
> transient scenario where you are reading large sized records and would hence 
> like to fetch fewer records in each getRecords call (so as to not exceed the 
> 2 Mb/sec [per shard 
>