[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520077#comment-16520077 ] ASF GitHub Bot commented on FLINK-8944: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5992 > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > Labels: pull-request-available > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16506155#comment-16506155 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on the issue: https://github.com/apache/flink/pull/5992 Bumping this up in case it got lost in the queue. @tzulitai > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491434#comment-16491434 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on the issue: https://github.com/apache/flink/pull/5992 Please note I rebased my changes to the master as I was having some failures when building just the flink-connector-kinesis maven project. I hope this will not cause problems with reviewing the code changes. Sorry for the inconvenience if it does. :( > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491432#comment-16491432 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r191033584 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -26,20 +29,60 @@ import com.amazonaws.ClientConfigurationFactory; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Shard; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Assert; import org.junit.Test; import org.powermock.reflect.Whitebox; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; /** * Test for methods in the {@link KinesisProxy} class. */ public class KinesisProxyTest { + private static final String NEXT_TOKEN = "NextToken"; + private static final String fakeStreamName = "fake-stream"; + private Set shardIdSet; + private List shards; + + protected static HashMap + createInitialSubscribedStreamsToLastDiscoveredShardsState(List streams) { + HashMap initial = new HashMap<>(); + for (String stream : streams) { + initial.put(stream, null); + } + return initial; + } + + private static ListShardsRequestMatcher initialListShardsRequestMatcher() { + return new ListShardsRequestMatcher(null, null); + } + + private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) { --- End diff -- Done > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491433#comment-16491433 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r191033585 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -26,20 +29,60 @@ import com.amazonaws.ClientConfigurationFactory; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Shard; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Assert; import org.junit.Test; import org.powermock.reflect.Whitebox; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; /** * Test for methods in the {@link KinesisProxy} class. */ public class KinesisProxyTest { + private static final String NEXT_TOKEN = "NextToken"; + private static final String fakeStreamName = "fake-stream"; + private Set shardIdSet; + private List shards; --- End diff -- Done > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16486812#comment-16486812 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r190140724 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -26,20 +29,60 @@ import com.amazonaws.ClientConfigurationFactory; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Shard; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Assert; import org.junit.Test; import org.powermock.reflect.Whitebox; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; /** * Test for methods in the {@link KinesisProxy} class. */ public class KinesisProxyTest { + private static final String NEXT_TOKEN = "NextToken"; + private static final String fakeStreamName = "fake-stream"; + private Set shardIdSet; + private List shards; + + protected static HashMap + createInitialSubscribedStreamsToLastDiscoveredShardsState(List streams) { + HashMap initial = new HashMap<>(); + for (String stream : streams) { + initial.put(stream, null); + } + return initial; + } + + private static ListShardsRequestMatcher initialListShardsRequestMatcher() { + return new ListShardsRequestMatcher(null, null); + } + + private static ListShardsRequestMatcher listShardsNextToken(final String nextToken) { --- End diff -- nit: IMO, it would help with readability if we move these private utility methods after the main test ones. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16486811#comment-16486811 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r190140597 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -26,20 +29,60 @@ import com.amazonaws.ClientConfigurationFactory; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ListShardsRequest; +import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Shard; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Assert; import org.junit.Test; import org.powermock.reflect.Whitebox; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; /** * Test for methods in the {@link KinesisProxy} class. */ public class KinesisProxyTest { + private static final String NEXT_TOKEN = "NextToken"; + private static final String fakeStreamName = "fake-stream"; + private Set shardIdSet; + private List shards; --- End diff -- Should we move these to be scoped only to the `testGetShardList ` test method? > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481373#comment-16481373 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r189417750 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -181,6 +181,25 @@ public static Properties replaceDeprecatedProducerKeys(Properties configProps) { return configProps; } + public static Properties replaceDeprecatedConsumerKeys(Properties configProps) { + if (configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) { --- End diff -- Done. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476706#comment-16476706 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188481796 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -181,6 +181,25 @@ public static Properties replaceDeprecatedProducerKeys(Properties configProps) { return configProps; } + public static Properties replaceDeprecatedConsumerKeys(Properties configProps) { + if (configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) { --- End diff -- Minor: this could be generalized by iterating over a map of oldkey -> newkey and I would also suggest to log a warning for deprecated keys > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476704#comment-16476704 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188481562 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- I have deprecated some old properties and added the new ones. This is my first time using the @deprecated annotations. Do let me know if there are better ways of doing this. I used sample PRs like https://goo.gl/LWcrp2 for these changes. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476701#comment-16476701 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188481158 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); assertEquals(, clientConfiguration.getSocketTimeout()); } - --- End diff -- Done. Sorry for the inconvenience. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476700#comment-16476700 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188481152 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); --- End diff -- Done. Sorry for the inconvenience. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476260#comment-16476260 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188379050 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); assertEquals(, clientConfiguration.getSocketTimeout()); } - --- End diff -- remove unnecessary change > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476257#comment-16476257 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188378986 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); --- End diff -- remove unnecessary change > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476252#comment-16476252 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188378324 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- I think that this should be one unit of work. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476243#comment-16476243 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377363 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- For the time being, I will keep the older names and make the changes to deprecate the property names in the followup PR. I am not sure what the policy is with respect to changes size. I feel breaking this up to 2 different PRs will make it easier to review. Let me know if you feel otherwise. I will pull in those changes into this PR then. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476244#comment-16476244 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377376 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { - final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); - describeStreamRequest.setExclusiveStartShardId(startShardId); + private ListShardsResult listShards(String streamName, @Nullable String startShardId, + @Nullable String startNextToken) + throws InterruptedException { + final ListShardsRequest listShardsRequest = new ListShardsRequest(); + if (startNextToken == null) { + listShardsRequest.setExclusiveStartShardId(startShardId); + listShardsRequest.setStreamName(streamName); + } else { + // Note the nextToken returned by AWS expires within 300 sec. + listShardsRequest.setNextToken(startNextToken); + } - DescribeStreamResult describeStreamResult = null; + ListShardsResult listShardsResults = null; - // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + // Call ListShards, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; - while (describeStreamResult == null) { // retry until we get a result + // List Shards returns just the first 1000 shard entries. Make sure that all entries + // are taken up. + while (listShardsResults == null) { // retry until we get a result try { - describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + listShardsResults = kinesisClient.listShards(listShardsRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( - describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); - LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + backoffMillis + " millis."); + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + LOG.warn("Got LimitExceededException when listing shards from stream " + streamName + + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); - } catch (ResourceNotFoundException re) { - throw new RuntimeException("Error while getting stream details", re); - } - } - - String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); - if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString( { - if (LOG.isWarnEnabled()) { - LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + - "describeStream operation will not contain any shard information."); + } catch (ResourceInUseException reInUse) { + if (LOG.isWarnEnabled()) { + // List Shards will throw an exception if stream in not in active state. Will return + LOG.warn("The stream is currently not in acti
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476246#comment-16476246 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377415 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -353,19 +355,21 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { private List getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List shardsOfStream = new ArrayList<>(); - DescribeStreamResult describeStreamResult; + // List Shards returns just the first 1000 shard entries. In order to read the entire stream, + // we need to use the returned nextToken to get additional shards. + ListShardsResult listShardsResult; + String startShardToken = null; do { - describeStreamResult = describeStream(streamName, lastSeenShardId); - - List shards = describeStreamResult.getStreamDescription().getShards(); + listShardsResult = listShards(streamName, lastSeenShardId, startShardToken); --- End diff -- Thanks for catching this. This is something which I have fixed by clearing shardsOfStream to ensure we return an empty shardsOfStream in case of ExpiredTokenException. I intended the following behavior for this: In case there is an unlikely case of expired next token, then we will just return an empty ShardsOfStream. This should be alright since in case there are no new shards discovered, by default it ends up returning an empty shardsOfStream. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476245#comment-16476245 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377393 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { - final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); - describeStreamRequest.setExclusiveStartShardId(startShardId); + private ListShardsResult listShards(String streamName, @Nullable String startShardId, + @Nullable String startNextToken) + throws InterruptedException { + final ListShardsRequest listShardsRequest = new ListShardsRequest(); + if (startNextToken == null) { + listShardsRequest.setExclusiveStartShardId(startShardId); + listShardsRequest.setStreamName(streamName); + } else { + // Note the nextToken returned by AWS expires within 300 sec. + listShardsRequest.setNextToken(startNextToken); + } - DescribeStreamResult describeStreamResult = null; + ListShardsResult listShardsResults = null; - // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + // Call ListShards, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; - while (describeStreamResult == null) { // retry until we get a result + // List Shards returns just the first 1000 shard entries. Make sure that all entries + // are taken up. + while (listShardsResults == null) { // retry until we get a result try { - describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + listShardsResults = kinesisClient.listShards(listShardsRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( - describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); - LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + backoffMillis + " millis."); + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + LOG.warn("Got LimitExceededException when listing shards from stream " + streamName + + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); - } catch (ResourceNotFoundException re) { - throw new RuntimeException("Error while getting stream details", re); - } - } - - String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); - if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString( { - if (LOG.isWarnEnabled()) { - LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + - "describeStream operation will not contain any shard information."); + } catch (ResourceInUseException reInUse) { + if (LOG.isWarnEnabled()) { + // List Shards will throw an exception if stream in not in active state. Will return + LOG.warn("The stream is currently not in acti
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476247#comment-16476247 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377434 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -70,20 +113,107 @@ public void testIsRecoverableExceptionWithNullErrorType() { } @Test - public void testCustomConfigurationOverride() { - Properties configProps = new Properties(); - configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - KinesisProxy proxy = new KinesisProxy(configProps) { - @Override - protected AmazonKinesis createKinesisClient(Properties configProps) { - ClientConfiguration clientConfig = new ClientConfigurationFactory().getConfig(); - clientConfig.setSocketTimeout(1); - return AWSUtil.createKinesisClient(configProps, clientConfig); + public void testGetShardList() throws Exception { + List shardIds = + Arrays.asList( + "shardId-", + "shardId-0001", + "shardId-0002", + "shardId-0003"); + shardIdSet = new HashSet<>(shardIds); + shards = + shardIds + .stream() + .map(shardId -> new Shard().withShardId(shardId)) + .collect(Collectors.toList()); + Properties kinesisConsumerConfig = new Properties(); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "fake_accesskey"); + kinesisConsumerConfig.setProperty( + ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "fake_secretkey"); + KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig); + AmazonKinesis mockClient = mock(AmazonKinesis.class); + Whitebox.setInternalState(kinesisProxy, "kinesisClient", mockClient); + + ListShardsResult responseWithMoreData = + new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN); + ListShardsResult responseFinal = + new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null); + doReturn(responseWithMoreData) + .when(mockClient) + .listShards(argThat(initialListShardsRequestMatcher())); + doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN))); + HashMap streamHashMap = + createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName)); + GetShardListResult shardListResult = kinesisProxy.getShardList(streamHashMap); + + Assert.assertEquals(shardListResult.hasRetrievedShards(), true); + + Set expectedStreams = new HashSet<>(); + expectedStreams.add(fakeStreamName); + Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), expectedStreams); + List actualShardList = + shardListResult.getRetrievedShardListOfStream(fakeStreamName); + List expectedStreamShard = new ArrayList<>(); + System.out.println(actualShardList.toString()); + assertThat(actualShardList, hasSize(4)); + for (int i = 0; i < 4; i++) { + StreamShardHandle shardHandle = + new StreamShardHandle( + fakeStreamName, + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))); + expectedStreamShard.add(shardHandle); + } + + Assert.assertThat( + actualShardList, + containsInAnyOrder( + expectedStreamShard.toArray(ne
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476075#comment-16476075 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tweise commented on the issue: https://github.com/apache/flink/pull/5992 Looks like we can either leave all the application facing constants as they are (which might look a bit strange given they refer to old API and we are now using the new), or we deprecate and duplicate :) If the latter, then I would suggest to map from old/deprecated to new property keys in a single place and issue deprecation warning. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473344#comment-16473344 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788284 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { - final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); - describeStreamRequest.setExclusiveStartShardId(startShardId); + private ListShardsResult listShards(String streamName, @Nullable String startShardId, + @Nullable String startNextToken) + throws InterruptedException { + final ListShardsRequest listShardsRequest = new ListShardsRequest(); + if (startNextToken == null) { + listShardsRequest.setExclusiveStartShardId(startShardId); + listShardsRequest.setStreamName(streamName); + } else { + // Note the nextToken returned by AWS expires within 300 sec. + listShardsRequest.setNextToken(startNextToken); + } - DescribeStreamResult describeStreamResult = null; + ListShardsResult listShardsResults = null; - // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + // Call ListShards, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; - while (describeStreamResult == null) { // retry until we get a result + // List Shards returns just the first 1000 shard entries. Make sure that all entries + // are taken up. + while (listShardsResults == null) { // retry until we get a result try { - describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + listShardsResults = kinesisClient.listShards(listShardsRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( - describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); - LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + backoffMillis + " millis."); + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + LOG.warn("Got LimitExceededException when listing shards from stream " + streamName + + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); - } catch (ResourceNotFoundException re) { - throw new RuntimeException("Error while getting stream details", re); - } - } - - String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); - if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString( { - if (LOG.isWarnEnabled()) { - LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + - "describeStream operation will not contain any shard information."); + } catch (ResourceInUseException reInUse) { + if (LOG.isWarnEnabled()) { + // List Shards will throw an exception if stream in not in active state. Will return + LOG.warn("The stream is currently not in activ
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473346#comment-16473346 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788213 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- Changing the name of this variable, strictly speaking, breaks backwards compatibility, as users might be using them. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473345#comment-16473345 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788223 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- This goes the same for all other key variable rename changes in this class. I would suggest to deprecate existing ones if we want to change the name internally. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473347#comment-16473347 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788338 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -353,19 +355,21 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { private List getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List shardsOfStream = new ArrayList<>(); - DescribeStreamResult describeStreamResult; + // List Shards returns just the first 1000 shard entries. In order to read the entire stream, + // we need to use the returned nextToken to get additional shards. + ListShardsResult listShardsResult; + String startShardToken = null; do { - describeStreamResult = describeStream(streamName, lastSeenShardId); - - List shards = describeStreamResult.getStreamDescription().getShards(); + listShardsResult = listShards(streamName, lastSeenShardId, startShardToken); --- End diff -- if within `listShards(...)` we caught the `ExpiredNextTokenException`, then `null` will be returned as the result, correct? If so, then the current built up `shardsIfStream` will be returned immediately, regardless of whether or not there are more shards following. Although it might not be too common that we have expired tokens here, I wonder if we can handle this case more gracefully (e.g., re-fetching a token to make sure that there really is no more shards). > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473343#comment-16473343 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788278 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { - final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); - describeStreamRequest.setExclusiveStartShardId(startShardId); + private ListShardsResult listShards(String streamName, @Nullable String startShardId, + @Nullable String startNextToken) + throws InterruptedException { + final ListShardsRequest listShardsRequest = new ListShardsRequest(); + if (startNextToken == null) { + listShardsRequest.setExclusiveStartShardId(startShardId); + listShardsRequest.setStreamName(streamName); + } else { + // Note the nextToken returned by AWS expires within 300 sec. + listShardsRequest.setNextToken(startNextToken); + } - DescribeStreamResult describeStreamResult = null; + ListShardsResult listShardsResults = null; - // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + // Call ListShards, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; - while (describeStreamResult == null) { // retry until we get a result + // List Shards returns just the first 1000 shard entries. Make sure that all entries + // are taken up. + while (listShardsResults == null) { // retry until we get a result try { - describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + listShardsResults = kinesisClient.listShards(listShardsRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( - describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); - LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + backoffMillis + " millis."); + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + LOG.warn("Got LimitExceededException when listing shards from stream " + streamName + + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); - } catch (ResourceNotFoundException re) { - throw new RuntimeException("Error while getting stream details", re); - } - } - - String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); - if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString( { - if (LOG.isWarnEnabled()) { - LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + - "describeStream operation will not contain any shard information."); + } catch (ResourceInUseException reInUse) { + if (LOG.isWarnEnabled()) { + // List Shards will throw an exception if stream in not in active state. Will return + LOG.warn("The stream is currently not in activ
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473342#comment-16473342 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187788363 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -70,20 +113,107 @@ public void testIsRecoverableExceptionWithNullErrorType() { } @Test - public void testCustomConfigurationOverride() { - Properties configProps = new Properties(); - configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - KinesisProxy proxy = new KinesisProxy(configProps) { - @Override - protected AmazonKinesis createKinesisClient(Properties configProps) { - ClientConfiguration clientConfig = new ClientConfigurationFactory().getConfig(); - clientConfig.setSocketTimeout(1); - return AWSUtil.createKinesisClient(configProps, clientConfig); + public void testGetShardList() throws Exception { + List shardIds = + Arrays.asList( + "shardId-", + "shardId-0001", + "shardId-0002", + "shardId-0003"); + shardIdSet = new HashSet<>(shardIds); + shards = + shardIds + .stream() + .map(shardId -> new Shard().withShardId(shardId)) + .collect(Collectors.toList()); + Properties kinesisConsumerConfig = new Properties(); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "fake_accesskey"); + kinesisConsumerConfig.setProperty( + ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "fake_secretkey"); + KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig); + AmazonKinesis mockClient = mock(AmazonKinesis.class); + Whitebox.setInternalState(kinesisProxy, "kinesisClient", mockClient); + + ListShardsResult responseWithMoreData = + new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN); + ListShardsResult responseFinal = + new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null); + doReturn(responseWithMoreData) + .when(mockClient) + .listShards(argThat(initialListShardsRequestMatcher())); + doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN))); + HashMap streamHashMap = + createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName)); + GetShardListResult shardListResult = kinesisProxy.getShardList(streamHashMap); + + Assert.assertEquals(shardListResult.hasRetrievedShards(), true); + + Set expectedStreams = new HashSet<>(); + expectedStreams.add(fakeStreamName); + Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), expectedStreams); + List actualShardList = + shardListResult.getRetrievedShardListOfStream(fakeStreamName); + List expectedStreamShard = new ArrayList<>(); + System.out.println(actualShardList.toString()); + assertThat(actualShardList, hasSize(4)); + for (int i = 0; i < 4; i++) { + StreamShardHandle shardHandle = + new StreamShardHandle( + fakeStreamName, + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))); + expectedStreamShard.add(shardHandle); + } + + Assert.assertThat( + actualShardList, + containsInAnyOrder( + expectedStreamShard.toArray(new
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472109#comment-16472109 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on the issue: https://github.com/apache/flink/pull/5992 R: @tzulitai @tweise > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472009#comment-16472009 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187632455 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -66,13 +66,13 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- I have retained the properties string values as same but changed the variable names to indicate listShards for readability. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472004#comment-16472004 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187630051 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -66,13 +66,13 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- Would it be better to include a new set of constants for listShards? Semantically this is equivalent, so having the same names for listShards should be okay but a little bit confusing. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472001#comment-16472001 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r187627524 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -66,13 +66,13 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- This would be a breaking change. We should leave these properties as is, if they are semantically equivalent. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471994#comment-16471994 ] ASF GitHub Bot commented on FLINK-8944: --- GitHub user kailashhd opened a pull request: https://github.com/apache/flink/pull/5992 [FLINK-8944] [Kinesis Connector] Use listShards instead of DescribeSt… …ream for shard discovery as it offer higher rate limits ## What is the purpose of the change List Shards provides high AWS rate limits unlike DescribeStreams (which is on AWS account level) allowing faster shard discovery when kinesis data source in case streams are changed(re-sharded) ## Brief change log - Change the kinesis connector to use listShards instead of DescribeStream for shard discovery. ## Verifying this change This change added tests and can be verified as follows: - Added a unit test to check the code path mocking out the kinesis depenedencies - Tested by running a small flink job with this connector. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / *no*) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / *no*) - The serializers: (yes / *no* / don't know) - The runtime per-record code paths (performance sensitive): (yes / *no* / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / *no* / don't know) - The S3 file system connector: (yes / *no* / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / *no*) - If yes, how is the feature documented? (*not applicable* / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/kailashhd/flink KinesisProxy Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5992.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5992 commit 3188f24b13c9009e977b6fb25da4d40c93fc811e Author: Kailash HD Date: 2018-03-26T16:42:25Z [FLINK-8944] [Kinesis Connector] Use listShards instead of DescribeStream for shard discovery as it offer higher rate limits > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442760#comment-16442760 ] Kailash Hassan Dayanand commented on FLINK-8944: [Update]: There is currently a minor issue with the listShards API in AWS which blocks these changes [https://github.com/aws/aws-sdk-java/issues/1490] (look at the last comment). One workaround could be get list of all shards every time which is slightly inefficient. Waiting for sometime to allow AWS changes to go in before submitting this. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)