[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...

2018-02-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5337


---


[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...

2018-01-23 Thread pluppens
Github user pluppens commented on a diff in the pull request:

https://github.com/apache/flink/pull/5337#discussion_r163226460
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 ---
@@ -515,6 +515,56 @@ public void 
testStreamShardMetadataSerializedUsingPojoSerializer() {
assertTrue(typeInformation.createSerializer(new 
ExecutionConfig()) instanceof PojoSerializer);
}
 
+   /**
+* FLINK-8484: ensure that a state change in the StreamShardMetadata 
other than {@link StreamShardMetadata#shardId} or
+* {@link StreamShardMetadata#streamName} does not result in the shard 
not being able to be restored.
+* This handles the corner case where the stored shard metadata is open 
(no ending sequence number), but after the
+* job restore, the shard has been closed (ending number set) due to 
re-sharding, and we can no longer rely on
+* {@link StreamShardMetadata#equals(Object)} to find back the sequence 
number in the collection of restored shard metadata.
+*/
+   @Test
+   public void testFindSequenceNumberToRestoreFrom() {
--- End diff --

Makes sense. I'll look into it and see if I can find a way to test it as a 
whole.


---


[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...

2018-01-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5337#discussion_r163175349
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -267,6 +269,27 @@ public void run(SourceContext sourceContext) throws 
Exception {
sourceContext.close();
}
 
+   /**
+* Tries to find the {@link SequenceNumber} for a given {@code 
kinesisStreamShard} in the list of the restored stream shards' metadata,
+* by comparing the stream name and shard id for equality.
+* @param current   the current Kinesis 
shard we're trying to find the restored sequence number for
+* @param sequenceNumsToRestore the restored sequence numbers
+* @return the sequence number, if any, or {@code null} if no matching 
shard is found
+*/
+   @VisibleForTesting
+   SequenceNumber findSequenceNumberToRestoreFrom(StreamShardMetadata 
current, HashMap sequenceNumsToRestore) {
+   checkNotNull(current.getStreamName(), "Stream name not set on 
the current metadata shard");
+   checkNotNull(current.getShardId(), "Shard id not set on the 
current metadata shard");
+
+   for (final Map.Entry entry 
: sequenceNumsToRestore.entrySet()) {
--- End diff --

The `final` is redundant here.


---


[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...

2018-01-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5337#discussion_r163176506
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 ---
@@ -515,6 +515,56 @@ public void 
testStreamShardMetadataSerializedUsingPojoSerializer() {
assertTrue(typeInformation.createSerializer(new 
ExecutionConfig()) instanceof PojoSerializer);
}
 
+   /**
+* FLINK-8484: ensure that a state change in the StreamShardMetadata 
other than {@link StreamShardMetadata#shardId} or
+* {@link StreamShardMetadata#streamName} does not result in the shard 
not being able to be restored.
+* This handles the corner case where the stored shard metadata is open 
(no ending sequence number), but after the
+* job restore, the shard has been closed (ending number set) due to 
re-sharding, and we can no longer rely on
+* {@link StreamShardMetadata#equals(Object)} to find back the sequence 
number in the collection of restored shard metadata.
+*/
+   @Test
+   public void testFindSequenceNumberToRestoreFrom() {
--- End diff --

Nice that a unit test is added for the method!

However, I think it would be even better if we also include a test which 
uses the `AbstractStreamOperatorTestHarness` to test this restore. That utility 
class allows taking a snapshot of the wrapped operator's state, and restoring 
it again with the snapshot.


---


[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...

2018-01-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5337#discussion_r163175172
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -210,16 +210,18 @@ public void run(SourceContext sourceContext) 
throws Exception {
for (StreamShardHandle shard : allShards) {
StreamShardMetadata kinesisStreamShard = 
KinesisDataFetcher.convertToStreamShardMetadata(shard);
if (sequenceNumsToRestore != null) {
-   if 
(sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
+   // find the sequence number for the given 
converted kinesis shard in our restored state
--- End diff --

nit: Capital 'K' for Kinesis


---


[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...

2018-01-23 Thread pluppens
GitHub user pluppens opened a pull request:

https://github.com/apache/flink/pull/5337

[FLINK-8484][flink-kinesis-connector] Ensure a Kinesis consumer snapshot 
restoration is able to handle recently closed shards

FLINK-8484: ensure that a state change in the StreamShardMetadata other 
than `StreamShardMetadata.shardId` or `StreamShardMetadata.streamName` does not 
result in the shard not being able to be restored. This handles the corner case 
where a shard might have been closed (ending sequence number set to not-null) 
since the last savepoint or checkpoint when a job is restarted from a snapshot 
state.

## Brief change log
 - Created a new method to perform the sequence number lookup
 - Ensure that a lookup for a given existing Kinesis shard does not rely on 
equals(), but rather checks for equality on the stream name and shard id only


## Verifying this change

This change added tests and can be verified as follows:
 - A new unit test was added in `FlinkKinesisConsumerTest` called 
`testFindSequenceNumberToRestoreFrom()` which tests the lookup mechanism

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pluppens/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5337.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5337


commit 5c756390002a2e1c00c7368bea3e1135b7722a20
Author: Philip Luppens 
Date:   2018-01-23T08:00:23Z

FLINK-8484: ensure that a state change in the StreamShardMetadata other 
than `StreamShardMetadata.shardId` or `StreamShardMetadata.streamName` does not 
result in the shard not being able to be restored. This handles the corner case 
where a shard might have been closed (ending sequence number set to not-null) 
since the last savepoint or checkpoint when a job is restarted from a snapshot 
state.




---