[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863874#comment-15863874
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user MayerRoman commented on the issue:

https://github.com/apache/flink/pull/3031
  
Ok


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863875#comment-15863875
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user MayerRoman closed the pull request at:

https://github.com/apache/flink/pull/3031


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-02-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863791#comment-15863791
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4616:


We decided that we should let the user function code be free of responsibility 
of checkpointing watermarks, and let user code simply leave that to the 
streaming internals (checkpointed by window operators, and perhaps also by 
source operators). Therefore, this would not be need in the Kafka consumer.

> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863789#comment-15863789
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3031
  
Thanks. Can you please close this PR then :-D ? I'll close the JIRA.


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863609#comment-15863609
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user MayerRoman commented on the issue:

https://github.com/apache/flink/pull/3031
  
Hello, Tzu-Li Tai!
I think you make a good decision and agree to the fact that the PR and JIRA 
ticket can be closed.

Do not worry about the done work, I got good experience in the process.


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863299#comment-15863299
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user MayerRoman commented on the issue:

https://github.com/apache/flink/pull/3031
  
I hope that I ended up with another issue, and I come back to this.

First, I want to ask a question that perhaps remove all the others.

Tzu-Li Tai, did I understand correctly that if discussion about letting the 
window operators checkpoint watermarks lead to the decision to implement this 
functionality in the window operators, the need to preserve the state of 
watermarks in Kafka consumer will disappear?


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15863326#comment-15863326
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3031
  
Hi @MayerRoman! Thank you for coming back to this issue.

I had a quick chat offline with @aljoscha about whether or not it'll be 
reasonable to add this. Either your approach in this PR or letting window 
operators checkpoint watermarks will both solve the issue of late elements 
after restore. We thought that we should let the user function code be free of 
responsibility of checkpointing watermarks, and let user code simply leave that 
to the streaming internals (checkpointed by window operators, and perhaps also 
by source operators).

So, essentially, the Kafka consumer should not need to checkpoint 
watermarks, and we can close this PR and the JIRA ticket. Very sorry for the 
late discussion on this, and having you worked on it already.

Let we know what you think and whether or not you agree :-)


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862836#comment-15862836
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3031
  
I think this PR should also include a test for the added feature.


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839419#comment-15839419
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3031
  
A re-clarification about backwards compatibility for state type change:
Currently, one limitation for compatible applications across savepoint 
restore is that you can't change the type of state otherwise state restore will 
fail, therefore not compatible. The only work around, is to have another field 
as the new state with the new type, and somehow try to "encode" / "decode" the 
watermark state into / from the original `Tuple2`. I 
don't think this is easily possible ...

At the same time, there was recent discussion about letting the window 
operators also checkpoint watermarks. So perhaps we might not need to let the 
Kafka sources checkpoint watermarks in the end, if the window operators already 
take care of restoring the previous event time.
What I'm curious about right now is whether or not in the future, 
redistributions of Kafka partition states across source subtasks will work well 
with the checkpointed watermarks in the downstream window operators. I don't 
think there should be a problem.

@aljoscha can you perhaps help clarify this?


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839321#comment-15839321
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97935971
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java
 ---
@@ -61,6 +61,10 @@ public long getCurrentWatermarkTimestamp() {
return partitionWatermark;
}
 
+   void setCurrentWatermarkTimestamp(long watermarkTimestamp) {
--- End diff --

The other methods seem to be `public` (although they can actually be 
package-private). Should we stay consistent with that here?


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839317#comment-15839317
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97935696
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -175,34 +176,115 @@ protected AbstractFetcher(
// 

 
/**
-* Takes a snapshot of the partition offsets.
+* Takes a snapshot of the partition offsets and watermarks.
 * 
 * Important: This method mus be called under the checkpoint lock.
 * 
-* @return A map from partition to current offset.
+* @return A map from partition to current offset and watermark.
 */
-   public HashMap snapshotCurrentState() {
+   public HashMap> 
snapshotCurrentState() {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
 
-   HashMap state = new 
HashMap<>(allPartitions.length);
-   for (KafkaTopicPartitionState partition : 
subscribedPartitions()) {
-   state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
+   HashMap> state = new 
HashMap<>(allPartitions.length);
+
+   switch (timestampWatermarkMode) {
+
+   case NO_TIMESTAMPS_WATERMARKS: {
+
+   for (KafkaTopicPartitionState partition : 
allPartitions) {
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
Long.MIN_VALUE));
+   }
+
+   return state;
+   }
+
+   case PERIODIC_WATERMARKS: {
+   
KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions =
+   
(KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions;
+
+   for 
(KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) 
{
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
partition.getCurrentWatermarkTimestamp()));
+   }
+
+   return state;
+   }
+
+   case PUNCTUATED_WATERMARKS: {
+   
KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions =
+   
(KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions;
+
+   for 
(KafkaTopicPartitionStateWithPunctuatedWatermarks partition : 
partitions) {
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
partition.getCurrentPartitionWatermark()));
+   }
+
+   return state;
+   }
+
+   default:
+   // cannot happen, add this as a guard for the 
future
+   throw new RuntimeException();
--- End diff --

Would be good to have a reason message here.


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839320#comment-15839320
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97758477
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -345,39 +345,39 @@ public void snapshotState(FunctionSnapshotContext 
context) throws Exception {
LOG.debug("snapshotState() called on closed source");
} else {
 
-   offsetsStateForCheckpoint.clear();
+   offsetsAndWatermarksStateForCheckpoint.clear();
 
final AbstractFetcher fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, 
which means we need to return the
-   // originally restored offsets or the assigned 
partitions
+   // originally restored offsets and watermarks 
or the assigned partitions
 
-   if (restoreToOffset != null) {
+   if (restoreToOffsetAndWatermark != null) {
 
-   for (Map.Entry kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
-   offsetsStateForCheckpoint.add(
-   
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), 
kafkaTopicPartitionLongEntry.getValue()));
+   for (Map.Entry> kafkaTopicPartitionOffsetAndWatermark : 
restoreToOffsetAndWatermark.entrySet()) {
+   
offsetsAndWatermarksStateForCheckpoint.add(
+   
Tuple2.of(kafkaTopicPartitionOffsetAndWatermark.getKey(), 
kafkaTopicPartitionOffsetAndWatermark.getValue()));
--- End diff --

Having a specific checkpoint state object will also be helpful for code 
readability in situations like this one (it's quite tricky to understand 
quickly what the key / value refers to, as well as some of the `f0`, `f1` calls 
in other parts of the PR. I know the previous code used `f0` and `f1` also, but 
I think it's a good opportunity to improve that).


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839316#comment-15839316
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97935720
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -175,34 +176,115 @@ protected AbstractFetcher(
// 

 
/**
-* Takes a snapshot of the partition offsets.
+* Takes a snapshot of the partition offsets and watermarks.
 * 
 * Important: This method mus be called under the checkpoint lock.
 * 
-* @return A map from partition to current offset.
+* @return A map from partition to current offset and watermark.
 */
-   public HashMap snapshotCurrentState() {
+   public HashMap> 
snapshotCurrentState() {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
 
-   HashMap state = new 
HashMap<>(allPartitions.length);
-   for (KafkaTopicPartitionState partition : 
subscribedPartitions()) {
-   state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
+   HashMap> state = new 
HashMap<>(allPartitions.length);
+
+   switch (timestampWatermarkMode) {
+
+   case NO_TIMESTAMPS_WATERMARKS: {
+
+   for (KafkaTopicPartitionState partition : 
allPartitions) {
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
Long.MIN_VALUE));
+   }
+
+   return state;
+   }
+
+   case PERIODIC_WATERMARKS: {
+   
KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions =
+   
(KafkaTopicPartitionStateWithPeriodicWatermarks []) allPartitions;
+
+   for 
(KafkaTopicPartitionStateWithPeriodicWatermarks partition : partitions) 
{
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
partition.getCurrentWatermarkTimestamp()));
+   }
+
+   return state;
+   }
+
+   case PUNCTUATED_WATERMARKS: {
+   
KafkaTopicPartitionStateWithPunctuatedWatermarks [] partitions =
+   
(KafkaTopicPartitionStateWithPunctuatedWatermarks []) allPartitions;
+
+   for 
(KafkaTopicPartitionStateWithPunctuatedWatermarks partition : 
partitions) {
+   
state.put(partition.getKafkaTopicPartition(), Tuple2.of(partition.getOffset(), 
partition.getCurrentPartitionWatermark()));
+   }
+
+   return state;
+   }
+
+   default:
+   // cannot happen, add this as a guard for the 
future
+   throw new RuntimeException();
}
-   return state;
}
 
/**
-* Restores the partition offsets.
+* Restores the partition offsets and watermarks.
 * 
-* @param snapshotState The offsets for the partitions 
+* @param snapshotState The offsets and watermarks for the partitions
 */
-   public void restoreOffsets(Map 
snapshotState) {
-   for (KafkaTopicPartitionState partition : allPartitions) {
-   Long offset = 
snapshotState.get(partition.getKafkaTopicPartition());
-   if (offset != null) {
-   partition.setOffset(offset);
+   public void restoreOffsetsAndWatermarks(Map> snapshotState) {
+
+   switch (timestampWatermarkMode) {
+
+   case NO_TIMESTAMPS_WATERMARKS: {
+   for (KafkaTopicPartitionState partition : 
allPartitions) {
+   Long offset = 
snapshotState.get(partition.getKafkaTopicPartition()).f0;
+   if (offset != null) {
+ 

[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839318#comment-15839318
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97757847
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -101,7 +101,7 @@
 * The assigner is kept in serialized form, to deserialize it into 
multiple copies */
private SerializedValue 
punctuatedWatermarkAssigner;
 
-   private transient ListState> 
offsetsStateForCheckpoint;
+   private transient ListState>> offsetsAndWatermarksStateForCheckpoint;
--- End diff --

I think we should switch to have a specific checkpointed state object 
instead of continuing to "extend" the original Tuple. This will also be helpful 
for compatibility for any future changes to the checkpointed state.


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839319#comment-15839319
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3031#discussion_r97935636
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -175,34 +176,115 @@ protected AbstractFetcher(
// 

 
/**
-* Takes a snapshot of the partition offsets.
+* Takes a snapshot of the partition offsets and watermarks.
 * 
 * Important: This method mus be called under the checkpoint lock.
 * 
-* @return A map from partition to current offset.
+* @return A map from partition to current offset and watermark.
 */
-   public HashMap snapshotCurrentState() {
+   public HashMap> 
snapshotCurrentState() {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
 
-   HashMap state = new 
HashMap<>(allPartitions.length);
-   for (KafkaTopicPartitionState partition : 
subscribedPartitions()) {
-   state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
+   HashMap> state = new 
HashMap<>(allPartitions.length);
+
+   switch (timestampWatermarkMode) {
+
+   case NO_TIMESTAMPS_WATERMARKS: {
+
+   for (KafkaTopicPartitionState partition : 
allPartitions) {
--- End diff --

Excessive empty line above this line.


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15834056#comment-15834056
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3031
  
Thank you for the contribution @MayerRoman. Just want to let you know that 
I've noticed this PR, and I think the issue is definitely something we want to 
fix. I'll allocate some time this week to review the PR.


> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
> Fix For: 1.2.0
>
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2016-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15769536#comment-15769536
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

Github user MayerRoman commented on the issue:

https://github.com/apache/flink/pull/3031
  
I think that the changes that I propose eliminates the possibility of 
starting with checkpoints created before my code changes.

Because now it saves ListState>> (partition + offset + watermark).
And before it saved ListState> (partition 
+ offset).

(I mean checkpoints version later then 1.1.
Recently Added backward compatibility with 1.1 snapshots is taken into 
account in my commit with it, I think everything is ok)


Please advise me how to repair backward compatibility.

I have some idea of how to implement it:

1)  somehow verify returned from stateStore.getSerializableListState(..) 
object
in initializeState method

https://github.com/apache/flink/pull/3031/files?diff=unified#diff-06bf4a7f73d98ef91309154654563475R321

is it
ListState>
or
ListState>>

2)  Use for saving watermark separate state-object.

Or it is necessary implement different way.

I would be grateful for help.



> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
> Fix For: 1.2.0
>
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2016-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15763551#comment-15763551
 ] 

ASF GitHub Bot commented on FLINK-4616:
---

GitHub user MayerRoman opened a pull request:

https://github.com/apache/flink/pull/3031

[FLINK-4616] Added functionality through which watermarks for each pa…

…rtition are saved and loaded via checkpointing mechanism

[FLINK-4616] Kafka consumer doesn't store last emmited watermarks per 
partition in state.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MayerRoman/flink FLINK_4616

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3031.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3031


commit bb9a54903dd445aa4d0750b1a0d6d1d592ab891f
Author: Roman Maier 
Date:   2016-12-20T07:28:12Z

[FLINK-4616] Added functionality through which watermarks for each 
partition are saved and loaded via checkpointing mechanism




> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
>Assignee: Roman Maier
> Fix For: 1.2.0
>
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4616) Kafka consumer doesn't store last emmited watermarks per partition in state

2016-09-21 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15510313#comment-15510313
 ] 

Aljoscha Krettek commented on FLINK-4616:
-

+1, I think this would be good to have.

> Kafka consumer doesn't store last emmited watermarks per partition in state
> ---
>
> Key: FLINK-4616
> URL: https://issues.apache.org/jira/browse/FLINK-4616
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.1.1
>Reporter: Yuri Makhno
> Fix For: 1.2.0, 1.1.3
>
>
> Kafka consumers stores in state only kafka offsets and doesn't store last 
> emmited watermarks, this may go to wrong state when checkpoint is restored:
> Let's say our watermark is (timestamp - 10) and in case we have the following 
> messages queue results will be different after checkpoint restore and during 
> normal processing:
> A(ts = 30)
> B(ts = 35)
> -- checkpoint goes here
> C(ts=15) -- this one should be filtered by next time window
> D(ts=60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)