[4/6] flink git commit: [FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState in Kinesis connector.
[FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState in Kinesis connector. This closes #5480. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17d6b495 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17d6b495 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17d6b495 Branch: refs/heads/release-1.4 Commit: 17d6b49583492f0f3b7fe9db5ffcd508fa324864 Parents: a231c3c Author: Thomas Weise Authored: Tue Feb 13 16:33:59 2018 -0800 Committer: Thomas Weise Committed: Sun Jun 17 20:59:01 2018 +0200 -- .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/17d6b495/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java -- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index abce21b..c00e190 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -496,7 +496,7 @@ public class KinesisDataFetcher { *when the shard state was registered. * @param lastSequenceNumber the last sequence number value to update */ - protected final void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { + protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { sourceContext.collectWithTimestamp(record, recordTimestamp); updateState(shardStateIndex, lastSequenceNumber);
flink git commit: [FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState in Kinesis connector.
Repository: flink Updated Branches: refs/heads/master eeac022f0 -> e75481cc6 [FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState in Kinesis connector. This closes #5480. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e75481cc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e75481cc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e75481cc Branch: refs/heads/master Commit: e75481cc6c5240e976f7eb829152153731fb63bf Parents: eeac022 Author: Thomas Weise Authored: Tue Feb 13 16:33:59 2018 -0800 Committer: Tzu-Li (Gordon) Tai Committed: Fri Feb 23 19:48:19 2018 +0800 -- .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e75481cc/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java -- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 945f396..65de24c 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -513,7 +513,7 @@ public class KinesisDataFetcher { *when the shard state was registered. * @param lastSequenceNumber the last sequence number value to update */ - protected final void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { + protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { synchronized (checkpointLock) { if (record != null) { sourceContext.collectWithTimestamp(record, recordTimestamp);