[jira] [Assigned] (FLINK-4574) Strengthen fetch interval implementation in Kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei-Che Wei reassigned FLINK-4574: -- Assignee: (was: Wei-Che Wei) > Strengthen fetch interval implementation in Kinesis consumer > > > Key: FLINK-4574 > URL: https://issues.apache.org/jira/browse/FLINK-4574 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Labels: pull-request-available > > As pointed out by [~rmetzger], right now the fetch interval implementation in > the {{ShardConsumer}} class of the Kinesis consumer can lead to much longer > interval times than specified by the user, ex. say the specified fetch > interval is {{f}}, it takes {{x}} to complete a {{getRecords()}} call, and > {{y}} to complete processing the fetched records for emitting, than the > actual interval between each fetch is actually {{f+x+y}}. > The main problem with this is that we can never guarantee how much time has > past since the last {{getRecords}} call, thus can not guarantee that returned > shard iterators will not have expired the next time we use them, even if we > limit the user-given value for {{f}} to not be longer than the iterator > expire time. > I propose to improve this by, per {{ShardConsumer}}, use a > {{ScheduledExecutorService}} / {{Timer}} to do the fixed-interval fetching, > and a separate blocking queue that collects the fetched records for emitting. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-4574) Strengthen fetch interval implementation in Kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei-Che Wei reassigned FLINK-4574: -- Assignee: Wei-Che Wei > Strengthen fetch interval implementation in Kinesis consumer > > > Key: FLINK-4574 > URL: https://issues.apache.org/jira/browse/FLINK-4574 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.1.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Wei-Che Wei > Fix For: 1.2.0, 1.1.4 > > > As pointed out by [~rmetzger], right now the fetch interval implementation in > the {{ShardConsumer}} class of the Kinesis consumer can lead to much longer > interval times than specified by the user, ex. say the specified fetch > interval is {{f}}, it takes {{x}} to complete a {{getRecords()}} call, and > {{y}} to complete processing the fetched records for emitting, than the > actual interval between each fetch is actually {{f+x+y}}. > The main problem with this is that we can never guarantee how much time has > past since the last {{getRecords}} call, thus can not guarantee that returned > shard iterators will not have expired the next time we use them, even if we > limit the user-given value for {{f}} to not be longer than the iterator > expire time. > I propose to improve this by, per {{ShardConsumer}}, use a > {{ScheduledExecutorService}} / {{Timer}} to do the fixed-interval fetching, > and a separate blocking queue that collects the fetched records for emitting. -- This message was sent by Atlassian JIRA (v6.3.4#6332)