[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5337 ---
[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...
Github user pluppens commented on a diff in the pull request: https://github.com/apache/flink/pull/5337#discussion_r163226460 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java --- @@ -515,6 +515,56 @@ public void testStreamShardMetadataSerializedUsingPojoSerializer() { assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer); } + /** +* FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#shardId} or +* {@link StreamShardMetadata#streamName} does not result in the shard not being able to be restored. +* This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the +* job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on +* {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata. +*/ + @Test + public void testFindSequenceNumberToRestoreFrom() { --- End diff -- Makes sense. I'll look into it and see if I can find a way to test it as a whole. ---
[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5337#discussion_r163175349 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -267,6 +269,27 @@ public void run(SourceContext sourceContext) throws Exception { sourceContext.close(); } + /** +* Tries to find the {@link SequenceNumber} for a given {@code kinesisStreamShard} in the list of the restored stream shards' metadata, +* by comparing the stream name and shard id for equality. +* @param current the current Kinesis shard we're trying to find the restored sequence number for +* @param sequenceNumsToRestore the restored sequence numbers +* @return the sequence number, if any, or {@code null} if no matching shard is found +*/ + @VisibleForTesting + SequenceNumber findSequenceNumberToRestoreFrom(StreamShardMetadata current, HashMapsequenceNumsToRestore) { + checkNotNull(current.getStreamName(), "Stream name not set on the current metadata shard"); + checkNotNull(current.getShardId(), "Shard id not set on the current metadata shard"); + + for (final Map.Entry entry : sequenceNumsToRestore.entrySet()) { --- End diff -- The `final` is redundant here. ---
[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5337#discussion_r163176506 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java --- @@ -515,6 +515,56 @@ public void testStreamShardMetadataSerializedUsingPojoSerializer() { assertTrue(typeInformation.createSerializer(new ExecutionConfig()) instanceof PojoSerializer); } + /** +* FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#shardId} or +* {@link StreamShardMetadata#streamName} does not result in the shard not being able to be restored. +* This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the +* job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on +* {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata. +*/ + @Test + public void testFindSequenceNumberToRestoreFrom() { --- End diff -- Nice that a unit test is added for the method! However, I think it would be even better if we also include a test which uses the `AbstractStreamOperatorTestHarness` to test this restore. That utility class allows taking a snapshot of the wrapped operator's state, and restoring it again with the snapshot. ---
[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5337#discussion_r163175172 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -210,16 +210,18 @@ public void run(SourceContext sourceContext) throws Exception { for (StreamShardHandle shard : allShards) { StreamShardMetadata kinesisStreamShard = KinesisDataFetcher.convertToStreamShardMetadata(shard); if (sequenceNumsToRestore != null) { - if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) { + // find the sequence number for the given converted kinesis shard in our restored state --- End diff -- nit: Capital 'K' for Kinesis ---
[GitHub] flink pull request #5337: [FLINK-8484][flink-kinesis-connector] Ensure a Kin...
GitHub user pluppens opened a pull request: https://github.com/apache/flink/pull/5337 [FLINK-8484][flink-kinesis-connector] Ensure a Kinesis consumer snapshot restoration is able to handle recently closed shards FLINK-8484: ensure that a state change in the StreamShardMetadata other than `StreamShardMetadata.shardId` or `StreamShardMetadata.streamName` does not result in the shard not being able to be restored. This handles the corner case where a shard might have been closed (ending sequence number set to not-null) since the last savepoint or checkpoint when a job is restarted from a snapshot state. ## Brief change log - Created a new method to perform the sequence number lookup - Ensure that a lookup for a given existing Kinesis shard does not rely on equals(), but rather checks for equality on the stream name and shard id only ## Verifying this change This change added tests and can be verified as follows: - A new unit test was added in `FlinkKinesisConsumerTest` called `testFindSequenceNumberToRestoreFrom()` which tests the lookup mechanism ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/pluppens/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5337.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5337 commit 5c756390002a2e1c00c7368bea3e1135b7722a20 Author: Philip LuppensDate: 2018-01-23T08:00:23Z FLINK-8484: ensure that a state change in the StreamShardMetadata other than `StreamShardMetadata.shardId` or `StreamShardMetadata.streamName` does not result in the shard not being able to be restored. This handles the corner case where a shard might have been closed (ending sequence number set to not-null) since the last savepoint or checkpoint when a job is restarted from a snapshot state. ---