[4/6] flink git commit: [FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState in Kinesis connector.

2018-06-22 Thread tzulitai
[FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState in 
Kinesis connector.

This closes #5480.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17d6b495
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17d6b495
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17d6b495

Branch: refs/heads/release-1.4
Commit: 17d6b49583492f0f3b7fe9db5ffcd508fa324864
Parents: a231c3c
Author: Thomas Weise 
Authored: Tue Feb 13 16:33:59 2018 -0800
Committer: Thomas Weise 
Committed: Sun Jun 17 20:59:01 2018 +0200

--
 .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/17d6b495/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
--
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index abce21b..c00e190 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -496,7 +496,7 @@ public class KinesisDataFetcher {
 *when the shard state was registered.
 * @param lastSequenceNumber the last sequence number value to update
 */
-   protected final void emitRecordAndUpdateState(T record, long 
recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
+   protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
sourceContext.collectWithTimestamp(record, 
recordTimestamp);
updateState(shardStateIndex, lastSequenceNumber);



flink git commit: [FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState in Kinesis connector.

2018-02-23 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/master eeac022f0 -> e75481cc6


[FLINK-8648] [kinesis] Allow for customization of emitRecordAndUpdateState in 
Kinesis connector.

This closes #5480.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e75481cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e75481cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e75481cc

Branch: refs/heads/master
Commit: e75481cc6c5240e976f7eb829152153731fb63bf
Parents: eeac022
Author: Thomas Weise 
Authored: Tue Feb 13 16:33:59 2018 -0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Fri Feb 23 19:48:19 2018 +0800

--
 .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e75481cc/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
--
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 945f396..65de24c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -513,7 +513,7 @@ public class KinesisDataFetcher {
 *when the shard state was registered.
 * @param lastSequenceNumber the last sequence number value to update
 */
-   protected final void emitRecordAndUpdateState(T record, long 
recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
+   protected void emitRecordAndUpdateState(T record, long recordTimestamp, 
int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
if (record != null) {
sourceContext.collectWithTimestamp(record, 
recordTimestamp);