[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-06-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5992


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-25 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r191033585
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -26,20 +29,60 @@
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test for methods in the {@link KinesisProxy} class.
  */
 public class KinesisProxyTest {
+   private static final String NEXT_TOKEN = "NextToken";
+   private static final String fakeStreamName = "fake-stream";
+   private Set shardIdSet;
+   private List shards;
--- End diff --

Done


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-25 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r191033584
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -26,20 +29,60 @@
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test for methods in the {@link KinesisProxy} class.
  */
 public class KinesisProxyTest {
+   private static final String NEXT_TOKEN = "NextToken";
+   private static final String fakeStreamName = "fake-stream";
+   private Set shardIdSet;
+   private List shards;
+
+   protected static HashMap
+   createInitialSubscribedStreamsToLastDiscoveredShardsState(List 
streams) {
+   HashMap initial = new HashMap<>();
+   for (String stream : streams) {
+   initial.put(stream, null);
+   }
+   return initial;
+   }
+
+   private static ListShardsRequestMatcher 
initialListShardsRequestMatcher() {
+   return new ListShardsRequestMatcher(null, null);
+   }
+
+   private static ListShardsRequestMatcher listShardsNextToken(final 
String nextToken) {
--- End diff --

Done


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r190140597
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -26,20 +29,60 @@
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test for methods in the {@link KinesisProxy} class.
  */
 public class KinesisProxyTest {
+   private static final String NEXT_TOKEN = "NextToken";
+   private static final String fakeStreamName = "fake-stream";
+   private Set shardIdSet;
+   private List shards;
--- End diff --

Should we move these to be scoped only to the `testGetShardList ` test 
method?


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r190140724
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -26,20 +29,60 @@
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test for methods in the {@link KinesisProxy} class.
  */
 public class KinesisProxyTest {
+   private static final String NEXT_TOKEN = "NextToken";
+   private static final String fakeStreamName = "fake-stream";
+   private Set shardIdSet;
+   private List shards;
+
+   protected static HashMap
+   createInitialSubscribedStreamsToLastDiscoveredShardsState(List 
streams) {
+   HashMap initial = new HashMap<>();
+   for (String stream : streams) {
+   initial.put(stream, null);
+   }
+   return initial;
+   }
+
+   private static ListShardsRequestMatcher 
initialListShardsRequestMatcher() {
+   return new ListShardsRequestMatcher(null, null);
+   }
+
+   private static ListShardsRequestMatcher listShardsNextToken(final 
String nextToken) {
--- End diff --

nit: IMO, it would help with readability if we move these private utility 
methods after the main test ones.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-18 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r189417750
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -181,6 +181,25 @@ public static Properties 
replaceDeprecatedProducerKeys(Properties configProps) {
return configProps;
}
 
+   public static Properties replaceDeprecatedConsumerKeys(Properties 
configProps) {
+   if 
(configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) 
{
--- End diff --

Done.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188481796
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -181,6 +181,25 @@ public static Properties 
replaceDeprecatedProducerKeys(Properties configProps) {
return configProps;
}
 
+   public static Properties replaceDeprecatedConsumerKeys(Properties 
configProps) {
+   if 
(configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) 
{
--- End diff --

Minor: this could be generalized by iterating over a map of oldkey -> 
newkey and I would also suggest to log a warning for deprecated keys


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188481562
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

I have deprecated some old properties and added the new ones. This is my 
first time using the @deprecated annotations. Do let me know if there are 
better ways of doing this. I used sample PRs like https://goo.gl/LWcrp2 for 
these changes. 





---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188481158
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
assertEquals(, clientConfiguration.getSocketTimeout());
}
-
--- End diff --

Done. Sorry for the inconvenience.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188481152
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
--- End diff --

Done. Sorry for the inconvenience.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188378986
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
--- End diff --

remove unnecessary change


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188379050
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
assertEquals(, clientConfiguration.getSocketTimeout());
}
-
--- End diff --

remove unnecessary change


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188378324
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

I think that this should be one unit of work.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377393
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -382,50 +386,62 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
-   final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
-   describeStreamRequest.setStreamName(streamName);
-   describeStreamRequest.setExclusiveStartShardId(startShardId);
+   private ListShardsResult listShards(String streamName, @Nullable String 
startShardId,
+   

@Nullable String startNextToken)
+   throws InterruptedException {
+   final ListShardsRequest listShardsRequest = new 
ListShardsRequest();
+   if (startNextToken == null) {
+   
listShardsRequest.setExclusiveStartShardId(startShardId);
+   listShardsRequest.setStreamName(streamName);
+   } else {
+   // Note the nextToken returned by AWS expires within 
300 sec.
+   listShardsRequest.setNextToken(startNextToken);
+   }
 
-   DescribeStreamResult describeStreamResult = null;
+   ListShardsResult listShardsResults = null;
 
-   // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
+   // Call ListShards, with full-jitter backoff (if we get 
LimitExceededException).
int attemptCount = 0;
-   while (describeStreamResult == null) { // retry until we get a 
result
+   // List Shards returns just the first 1000 shard entries. Make 
sure that all entries
+   // are taken up.
+   while (listShardsResults == null) { // retry until we get a 
result
try {
-   describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
+   listShardsResults = 
kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
-   describeStreamBaseBackoffMillis, 
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
-   LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
-   + backoffMillis + " millis.");
+   listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+   LOG.warn("Got LimitExceededException 
when listing shards from stream " + streamName
+   + ". 
Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
-   } catch (ResourceNotFoundException re) {
-   throw new RuntimeException("Error while getting 
stream details", re);
-   }
-   }
-
-   String streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
-   if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || 
streamStatus.equals(StreamStatus.UPDATING.toString( {
-   if (LOG.isWarnEnabled()) {
-   LOG.warn("The status of stream " + streamName + 
" is " + streamStatus + "; result of the current " +
-   "describeStream operation will not 
contain any shard information.");
+   } catch (ResourceInUseException reInUse) {
+   if (LOG.isWarnEnabled()) {
+   // List Shards will throw an exception 
if stream in not in active state. Will return
+   LOG.warn("The stream is currently not 
in active state. Reusing the older state "
+   + "for the time being");
+   break;
+   }
+   } catch 

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377434
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -70,20 +113,107 @@ public void 
testIsRecoverableExceptionWithNullErrorType() {
}
 
@Test
-   public void testCustomConfigurationOverride() {
-   Properties configProps = new Properties();
-   configProps.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
-   KinesisProxy proxy = new KinesisProxy(configProps) {
-   @Override
-   protected AmazonKinesis createKinesisClient(Properties 
configProps) {
-   ClientConfiguration clientConfig = new 
ClientConfigurationFactory().getConfig();
-   clientConfig.setSocketTimeout(1);
-   return AWSUtil.createKinesisClient(configProps, 
clientConfig);
+   public void testGetShardList() throws Exception {
+   List shardIds =
+   Arrays.asList(
+   "shardId-",
+   "shardId-0001",
+   "shardId-0002",
+   "shardId-0003");
+   shardIdSet = new HashSet<>(shardIds);
+   shards =
+   shardIds
+   .stream()
+   .map(shardId -> new 
Shard().withShardId(shardId))
+   .collect(Collectors.toList());
+   Properties kinesisConsumerConfig = new Properties();
+   
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+   
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"fake_accesskey");
+   kinesisConsumerConfig.setProperty(
+   ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"fake_secretkey");
+   KinesisProxy kinesisProxy = new 
KinesisProxy(kinesisConsumerConfig);
+   AmazonKinesis mockClient = mock(AmazonKinesis.class);
+   Whitebox.setInternalState(kinesisProxy, "kinesisClient", 
mockClient);
+
+   ListShardsResult responseWithMoreData =
+   new 
ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
+   ListShardsResult responseFinal =
+   new 
ListShardsResult().withShards(shards.subList(2, 
shards.size())).withNextToken(null);
+   doReturn(responseWithMoreData)
+   .when(mockClient)
+   
.listShards(argThat(initialListShardsRequestMatcher()));
+   
doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN)));
+   HashMap streamHashMap =
+   
createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName));
+   GetShardListResult shardListResult = 
kinesisProxy.getShardList(streamHashMap);
+
+   Assert.assertEquals(shardListResult.hasRetrievedShards(), true);
+
+   Set expectedStreams = new HashSet<>();
+   expectedStreams.add(fakeStreamName);
+   
Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), 
expectedStreams);
+   List actualShardList =
+   
shardListResult.getRetrievedShardListOfStream(fakeStreamName);
+   List expectedStreamShard = new ArrayList<>();
+   System.out.println(actualShardList.toString());
+   assertThat(actualShardList, hasSize(4));
+   for (int i = 0; i < 4; i++) {
+   StreamShardHandle shardHandle =
+   new StreamShardHandle(
+   fakeStreamName,
+   new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)));
+   expectedStreamShard.add(shardHandle);
+   }
+
+   Assert.assertThat(
+   actualShardList,
+   containsInAnyOrder(
+   expectedStreamShard.toArray(new 
StreamShardHandle[actualShardList.size()])));
+   }
+
+   private static class ListShardsRequestMatcher extends 
TypeSafeDiagnosingMatcher {
+   private final String shardId;
+   private final String 

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377363
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

For the time being, I will keep the older names and make the changes to 
deprecate the property names in the followup PR. I am not sure what the policy 
is with respect to changes size. I feel breaking this up to 2 different PRs 
will make it easier to review. Let me know if you feel otherwise. I will pull 
in those changes into this PR then. 


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377415
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -353,19 +355,21 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
private List getShardsOfStream(String streamName, 
@Nullable String lastSeenShardId) throws InterruptedException {
List shardsOfStream = new ArrayList<>();
 
-   DescribeStreamResult describeStreamResult;
+   // List Shards returns just the first 1000 shard entries. In 
order to read the entire stream,
+   // we need to use the returned nextToken to get additional 
shards.
+   ListShardsResult listShardsResult;
+   String startShardToken = null;
do {
-   describeStreamResult = describeStream(streamName, 
lastSeenShardId);
-
-   List shards = 
describeStreamResult.getStreamDescription().getShards();
+   listShardsResult = listShards(streamName, 
lastSeenShardId, startShardToken);
--- End diff --

Thanks for catching this. This is something which I have fixed by clearing 
shardsOfStream to ensure we return an empty shardsOfStream in case of 
ExpiredTokenException.
I intended the following behavior for this: In case there is an unlikely 
case of expired next token, then we will just return an empty ShardsOfStream. 
This should be alright since in case there are no new shards discovered, by 
default it ends up returning an empty shardsOfStream. 


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377376
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -382,50 +386,62 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
-   final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
-   describeStreamRequest.setStreamName(streamName);
-   describeStreamRequest.setExclusiveStartShardId(startShardId);
+   private ListShardsResult listShards(String streamName, @Nullable String 
startShardId,
+   

@Nullable String startNextToken)
+   throws InterruptedException {
+   final ListShardsRequest listShardsRequest = new 
ListShardsRequest();
+   if (startNextToken == null) {
+   
listShardsRequest.setExclusiveStartShardId(startShardId);
+   listShardsRequest.setStreamName(streamName);
+   } else {
+   // Note the nextToken returned by AWS expires within 
300 sec.
+   listShardsRequest.setNextToken(startNextToken);
+   }
 
-   DescribeStreamResult describeStreamResult = null;
+   ListShardsResult listShardsResults = null;
 
-   // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
+   // Call ListShards, with full-jitter backoff (if we get 
LimitExceededException).
int attemptCount = 0;
-   while (describeStreamResult == null) { // retry until we get a 
result
+   // List Shards returns just the first 1000 shard entries. Make 
sure that all entries
+   // are taken up.
+   while (listShardsResults == null) { // retry until we get a 
result
try {
-   describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
+   listShardsResults = 
kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
-   describeStreamBaseBackoffMillis, 
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
-   LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
-   + backoffMillis + " millis.");
+   listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+   LOG.warn("Got LimitExceededException 
when listing shards from stream " + streamName
+   + ". 
Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
-   } catch (ResourceNotFoundException re) {
-   throw new RuntimeException("Error while getting 
stream details", re);
-   }
-   }
-
-   String streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
-   if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || 
streamStatus.equals(StreamStatus.UPDATING.toString( {
-   if (LOG.isWarnEnabled()) {
-   LOG.warn("The status of stream " + streamName + 
" is " + streamStatus + "; result of the current " +
-   "describeStream operation will not 
contain any shard information.");
+   } catch (ResourceInUseException reInUse) {
+   if (LOG.isWarnEnabled()) {
+   // List Shards will throw an exception 
if stream in not in active state. Will return
+   LOG.warn("The stream is currently not 
in active state. Reusing the older state "
+   + "for the time being");
+   break;
+   }
+   } catch 

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-12 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187788278
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -382,50 +386,62 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
-   final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
-   describeStreamRequest.setStreamName(streamName);
-   describeStreamRequest.setExclusiveStartShardId(startShardId);
+   private ListShardsResult listShards(String streamName, @Nullable String 
startShardId,
+   

@Nullable String startNextToken)
+   throws InterruptedException {
+   final ListShardsRequest listShardsRequest = new 
ListShardsRequest();
+   if (startNextToken == null) {
+   
listShardsRequest.setExclusiveStartShardId(startShardId);
+   listShardsRequest.setStreamName(streamName);
+   } else {
+   // Note the nextToken returned by AWS expires within 
300 sec.
+   listShardsRequest.setNextToken(startNextToken);
+   }
 
-   DescribeStreamResult describeStreamResult = null;
+   ListShardsResult listShardsResults = null;
 
-   // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
+   // Call ListShards, with full-jitter backoff (if we get 
LimitExceededException).
int attemptCount = 0;
-   while (describeStreamResult == null) { // retry until we get a 
result
+   // List Shards returns just the first 1000 shard entries. Make 
sure that all entries
+   // are taken up.
+   while (listShardsResults == null) { // retry until we get a 
result
try {
-   describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
+   listShardsResults = 
kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
-   describeStreamBaseBackoffMillis, 
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
-   LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
-   + backoffMillis + " millis.");
+   listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+   LOG.warn("Got LimitExceededException 
when listing shards from stream " + streamName
+   + ". 
Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
-   } catch (ResourceNotFoundException re) {
-   throw new RuntimeException("Error while getting 
stream details", re);
-   }
-   }
-
-   String streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
-   if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || 
streamStatus.equals(StreamStatus.UPDATING.toString( {
-   if (LOG.isWarnEnabled()) {
-   LOG.warn("The status of stream " + streamName + 
" is " + streamStatus + "; result of the current " +
-   "describeStream operation will not 
contain any shard information.");
+   } catch (ResourceInUseException reInUse) {
+   if (LOG.isWarnEnabled()) {
+   // List Shards will throw an exception 
if stream in not in active state. Will return
+   LOG.warn("The stream is currently not 
in active state. Reusing the older state "
+   + "for the time being");
+   break;
+   }
+   } catch (ResourceNotFoundException 

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-12 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187788338
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -353,19 +355,21 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
private List getShardsOfStream(String streamName, 
@Nullable String lastSeenShardId) throws InterruptedException {
List shardsOfStream = new ArrayList<>();
 
-   DescribeStreamResult describeStreamResult;
+   // List Shards returns just the first 1000 shard entries. In 
order to read the entire stream,
+   // we need to use the returned nextToken to get additional 
shards.
+   ListShardsResult listShardsResult;
+   String startShardToken = null;
do {
-   describeStreamResult = describeStream(streamName, 
lastSeenShardId);
-
-   List shards = 
describeStreamResult.getStreamDescription().getShards();
+   listShardsResult = listShards(streamName, 
lastSeenShardId, startShardToken);
--- End diff --

if within `listShards(...)` we caught the `ExpiredNextTokenException`, then 
`null` will be returned as the result, correct?
If so, then the current built up `shardsIfStream` will be returned 
immediately, regardless of whether or not there are more shards following.

Although it might not be too common that we have expired tokens here, I 
wonder if we can handle this case more gracefully (e.g., re-fetching a token to 
make sure that there really is no more shards).


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-12 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187788213
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

Changing the name of this variable, strictly speaking, breaks backwards 
compatibility, as users might be using them.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-12 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187788363
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -70,20 +113,107 @@ public void 
testIsRecoverableExceptionWithNullErrorType() {
}
 
@Test
-   public void testCustomConfigurationOverride() {
-   Properties configProps = new Properties();
-   configProps.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
-   KinesisProxy proxy = new KinesisProxy(configProps) {
-   @Override
-   protected AmazonKinesis createKinesisClient(Properties 
configProps) {
-   ClientConfiguration clientConfig = new 
ClientConfigurationFactory().getConfig();
-   clientConfig.setSocketTimeout(1);
-   return AWSUtil.createKinesisClient(configProps, 
clientConfig);
+   public void testGetShardList() throws Exception {
+   List shardIds =
+   Arrays.asList(
+   "shardId-",
+   "shardId-0001",
+   "shardId-0002",
+   "shardId-0003");
+   shardIdSet = new HashSet<>(shardIds);
+   shards =
+   shardIds
+   .stream()
+   .map(shardId -> new 
Shard().withShardId(shardId))
+   .collect(Collectors.toList());
+   Properties kinesisConsumerConfig = new Properties();
+   
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+   
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"fake_accesskey");
+   kinesisConsumerConfig.setProperty(
+   ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"fake_secretkey");
+   KinesisProxy kinesisProxy = new 
KinesisProxy(kinesisConsumerConfig);
+   AmazonKinesis mockClient = mock(AmazonKinesis.class);
+   Whitebox.setInternalState(kinesisProxy, "kinesisClient", 
mockClient);
+
+   ListShardsResult responseWithMoreData =
+   new 
ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
+   ListShardsResult responseFinal =
+   new 
ListShardsResult().withShards(shards.subList(2, 
shards.size())).withNextToken(null);
+   doReturn(responseWithMoreData)
+   .when(mockClient)
+   
.listShards(argThat(initialListShardsRequestMatcher()));
+   
doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN)));
+   HashMap streamHashMap =
+   
createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName));
+   GetShardListResult shardListResult = 
kinesisProxy.getShardList(streamHashMap);
+
+   Assert.assertEquals(shardListResult.hasRetrievedShards(), true);
+
+   Set expectedStreams = new HashSet<>();
+   expectedStreams.add(fakeStreamName);
+   
Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), 
expectedStreams);
+   List actualShardList =
+   
shardListResult.getRetrievedShardListOfStream(fakeStreamName);
+   List expectedStreamShard = new ArrayList<>();
+   System.out.println(actualShardList.toString());
+   assertThat(actualShardList, hasSize(4));
+   for (int i = 0; i < 4; i++) {
+   StreamShardHandle shardHandle =
+   new StreamShardHandle(
+   fakeStreamName,
+   new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)));
+   expectedStreamShard.add(shardHandle);
+   }
+
+   Assert.assertThat(
+   actualShardList,
+   containsInAnyOrder(
+   expectedStreamShard.toArray(new 
StreamShardHandle[actualShardList.size()])));
+   }
+
+   private static class ListShardsRequestMatcher extends 
TypeSafeDiagnosingMatcher {
+   private final String shardId;
+   private final String 

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-12 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187788223
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

This goes the same for all other key variable rename changes in this class.
I would suggest to deprecate existing ones if we want to change the name 
internally.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-12 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187788284
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -382,50 +386,62 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
-   final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
-   describeStreamRequest.setStreamName(streamName);
-   describeStreamRequest.setExclusiveStartShardId(startShardId);
+   private ListShardsResult listShards(String streamName, @Nullable String 
startShardId,
+   

@Nullable String startNextToken)
+   throws InterruptedException {
+   final ListShardsRequest listShardsRequest = new 
ListShardsRequest();
+   if (startNextToken == null) {
+   
listShardsRequest.setExclusiveStartShardId(startShardId);
+   listShardsRequest.setStreamName(streamName);
+   } else {
+   // Note the nextToken returned by AWS expires within 
300 sec.
+   listShardsRequest.setNextToken(startNextToken);
+   }
 
-   DescribeStreamResult describeStreamResult = null;
+   ListShardsResult listShardsResults = null;
 
-   // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
+   // Call ListShards, with full-jitter backoff (if we get 
LimitExceededException).
int attemptCount = 0;
-   while (describeStreamResult == null) { // retry until we get a 
result
+   // List Shards returns just the first 1000 shard entries. Make 
sure that all entries
+   // are taken up.
+   while (listShardsResults == null) { // retry until we get a 
result
try {
-   describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
+   listShardsResults = 
kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
-   describeStreamBaseBackoffMillis, 
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
-   LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
-   + backoffMillis + " millis.");
+   listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+   LOG.warn("Got LimitExceededException 
when listing shards from stream " + streamName
+   + ". 
Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
-   } catch (ResourceNotFoundException re) {
-   throw new RuntimeException("Error while getting 
stream details", re);
-   }
-   }
-
-   String streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
-   if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || 
streamStatus.equals(StreamStatus.UPDATING.toString( {
-   if (LOG.isWarnEnabled()) {
-   LOG.warn("The status of stream " + streamName + 
" is " + streamStatus + "; result of the current " +
-   "describeStream operation will not 
contain any shard information.");
+   } catch (ResourceInUseException reInUse) {
+   if (LOG.isWarnEnabled()) {
+   // List Shards will throw an exception 
if stream in not in active state. Will return
+   LOG.warn("The stream is currently not 
in active state. Reusing the older state "
+   + "for the time being");
+   break;
+   }
+   } catch (ResourceNotFoundException 

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-11 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187632455
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -66,13 +66,13 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
/** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

I have retained the properties string values as same but changed the 
variable names to indicate listShards for readability. 


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-11 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187630051
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -66,13 +66,13 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
/** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

Would it be better to include a new set of constants for listShards? 
Semantically this is equivalent, so having the same names for listShards should 
be okay but a little bit confusing. 


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-11 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187627524
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -66,13 +66,13 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
/** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

This would be a breaking change. We should leave these properties as is, if 
they are semantically equivalent.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-11 Thread kailashhd
GitHub user kailashhd opened a pull request:

https://github.com/apache/flink/pull/5992

[FLINK-8944] [Kinesis Connector] Use listShards instead of DescribeSt…

…ream for shard discovery as it offer higher rate limits

## What is the purpose of the change

List Shards provides high AWS rate limits unlike DescribeStreams (which is 
on AWS account level) allowing faster shard discovery when kinesis data source 
in case streams are changed(re-sharded)

## Brief change log
 - Change the kinesis connector to use listShards instead of DescribeStream 
for shard discovery.

## Verifying this change

This change added tests and can be verified as follows:
 - Added a unit test to check the code path mocking out the kinesis 
depenedencies
 - Tested by running a small flink job with this connector.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / *no*)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / *no*)
  - The serializers: (yes / *no* / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / *no* 
/ don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / *no* / don't know)
  - The S3 file system connector: (yes / *no* / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / *no*)
  - If yes, how is the feature documented? (*not applicable* / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kailashhd/flink KinesisProxy

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5992.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5992


commit 3188f24b13c9009e977b6fb25da4d40c93fc811e
Author: Kailash HD 
Date:   2018-03-26T16:42:25Z

[FLINK-8944] [Kinesis Connector] Use listShards instead of DescribeStream 
for shard discovery as it offer higher rate limits




---