[jira] [Commented] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493950#comment-15493950
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4617:


Revisited and reconfirmed that this is actually a bug in the 0.9 consumer 
(sorry for mis-concluding in the first place). Please see FLINK-4618.

> Kafka & Flink duplicate messages on restart
> ---
>
> Key: FLINK-4617
> URL: https://issues.apache.org/jira/browse/FLINK-4617
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2
> Environment: Ubuntu 16.04
> Flink 1.1.*
> Kafka 0.9.0.1
> Scala 2.11.7
> Java 1.8.0_91
>Reporter: Matthew Barlocker
>Priority: Critical
>
> [StackOverflow 
> Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]
> Flink (the kafka connector) re-runs the last 3-9 messages it saw before it 
> was shut down.
> *My code:*
> {code}
> import java.util.Properties
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.CheckpointingMode
> import org.apache.flink.streaming.connectors.kafka._
> import org.apache.flink.streaming.util.serialization._
> import org.apache.flink.runtime.state.filesystem._
> object Runner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(500)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "testing");
> val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
> SimpleStringSchema(), properties)
> val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
> "testing-out", new SimpleStringSchema())
> env.addSource(kafkaConsumer)
>   .addSink(kafkaProducer)
> env.execute()
>   }
> }
> {code}
> *My sbt dependencies:*
> {code}
> libraryDependencies ++= Seq(
> "org.apache.flink" %% "flink-scala" % "1.1.2",
> "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
> "org.apache.flink" %% "flink-clients" % "1.1.2",
> "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
> "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
> )
> {code}
> *My process:*
> using 3 terminals:
> {code}
> TERM-1 start sbt, run program
> TERM-2 create kafka topics testing-in and testing-out
> TERM-2 run kafka-console-producer on testing-in topic
> TERM-3 run kafka-console-consumer on testing-out topic
> TERM-2 send data to kafka producer.
> Wait for a couple seconds (buffers need to flush)
> TERM-3 watch data appear in testing-out topic
> Wait for at least 500 milliseconds for checkpointing to happen
> TERM-1 stop sbt
> TERM-1 run sbt
> TERM-3 watch last few lines of data appear in testing-out topic
> {code}
> *My expectations:*
> When there are no errors in the system, I expect to be able to turn flink on 
> and off without reprocessing messages that successfully completed the stream 
> in a prior run.
> *My attempts to fix:*
> I've added the call to setStateBackend, thinking that perhaps the default 
> memory backend just didn't remember correctly. That didn't seem to help.
> I've removed the call to enableCheckpointing, hoping that perhaps there was a 
> separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
> help.
> I've used different sinks, RollingFileSink, print(); hoping that maybe the 
> bug was in kafka. That didn't seem to help.
> I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
> maybe the bug was in the latest version. That didn't seem to help.
> I've added the zookeeper.connect config to the properties object, hoping that 
> the comment about it only being useful in 0.8 was wrong. That didn't seem to 
> help.
> I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea 
> drfloob). That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-15 Thread Matthew Barlocker (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15493667#comment-15493667
 ] 

Matthew Barlocker commented on FLINK-4617:
--

Sounds good. Thanks!

> Kafka & Flink duplicate messages on restart
> ---
>
> Key: FLINK-4617
> URL: https://issues.apache.org/jira/browse/FLINK-4617
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2
> Environment: Ubuntu 16.04
> Flink 1.1.*
> Kafka 0.9.0.1
> Scala 2.11.7
> Java 1.8.0_91
>Reporter: Matthew Barlocker
>Priority: Critical
>
> [StackOverflow 
> Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]
> Flink (the kafka connector) re-runs the last 3-9 messages it saw before it 
> was shut down.
> *My code:*
> {code}
> import java.util.Properties
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.CheckpointingMode
> import org.apache.flink.streaming.connectors.kafka._
> import org.apache.flink.streaming.util.serialization._
> import org.apache.flink.runtime.state.filesystem._
> object Runner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(500)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "testing");
> val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
> SimpleStringSchema(), properties)
> val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
> "testing-out", new SimpleStringSchema())
> env.addSource(kafkaConsumer)
>   .addSink(kafkaProducer)
> env.execute()
>   }
> }
> {code}
> *My sbt dependencies:*
> {code}
> libraryDependencies ++= Seq(
> "org.apache.flink" %% "flink-scala" % "1.1.2",
> "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
> "org.apache.flink" %% "flink-clients" % "1.1.2",
> "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
> "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
> )
> {code}
> *My process:*
> using 3 terminals:
> {code}
> TERM-1 start sbt, run program
> TERM-2 create kafka topics testing-in and testing-out
> TERM-2 run kafka-console-producer on testing-in topic
> TERM-3 run kafka-console-consumer on testing-out topic
> TERM-2 send data to kafka producer.
> Wait for a couple seconds (buffers need to flush)
> TERM-3 watch data appear in testing-out topic
> Wait for at least 500 milliseconds for checkpointing to happen
> TERM-1 stop sbt
> TERM-1 run sbt
> TERM-3 watch last few lines of data appear in testing-out topic
> {code}
> *My expectations:*
> When there are no errors in the system, I expect to be able to turn flink on 
> and off without reprocessing messages that successfully completed the stream 
> in a prior run.
> *My attempts to fix:*
> I've added the call to setStateBackend, thinking that perhaps the default 
> memory backend just didn't remember correctly. That didn't seem to help.
> I've removed the call to enableCheckpointing, hoping that perhaps there was a 
> separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
> help.
> I've used different sinks, RollingFileSink, print(); hoping that maybe the 
> bug was in kafka. That didn't seem to help.
> I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
> maybe the bug was in the latest version. That didn't seem to help.
> I've added the zookeeper.connect config to the properties object, hoping that 
> the comment about it only being useful in 0.8 was wrong. That didn't seem to 
> help.
> I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea 
> drfloob). That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15492513#comment-15492513
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4617:


Hi [~mbarlocker], now that this is sorted out (on SO), I'm closing this ticket 
now ;)

> Kafka & Flink duplicate messages on restart
> ---
>
> Key: FLINK-4617
> URL: https://issues.apache.org/jira/browse/FLINK-4617
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2
> Environment: Ubuntu 16.04
> Flink 1.1.*
> Kafka 0.9.0.1
> Scala 2.11.7
> Java 1.8.0_91
>Reporter: Matthew Barlocker
>Priority: Critical
>
> [StackOverflow 
> Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]
> Flink (the kafka connector) re-runs the last 3-9 messages it saw before it 
> was shut down.
> *My code:*
> {code}
> import java.util.Properties
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.CheckpointingMode
> import org.apache.flink.streaming.connectors.kafka._
> import org.apache.flink.streaming.util.serialization._
> import org.apache.flink.runtime.state.filesystem._
> object Runner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(500)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "testing");
> val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
> SimpleStringSchema(), properties)
> val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
> "testing-out", new SimpleStringSchema())
> env.addSource(kafkaConsumer)
>   .addSink(kafkaProducer)
> env.execute()
>   }
> }
> {code}
> *My sbt dependencies:*
> {code}
> libraryDependencies ++= Seq(
> "org.apache.flink" %% "flink-scala" % "1.1.2",
> "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
> "org.apache.flink" %% "flink-clients" % "1.1.2",
> "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
> "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
> )
> {code}
> *My process:*
> using 3 terminals:
> {code}
> TERM-1 start sbt, run program
> TERM-2 create kafka topics testing-in and testing-out
> TERM-2 run kafka-console-producer on testing-in topic
> TERM-3 run kafka-console-consumer on testing-out topic
> TERM-2 send data to kafka producer.
> Wait for a couple seconds (buffers need to flush)
> TERM-3 watch data appear in testing-out topic
> Wait for at least 500 milliseconds for checkpointing to happen
> TERM-1 stop sbt
> TERM-1 run sbt
> TERM-3 watch last few lines of data appear in testing-out topic
> {code}
> *My expectations:*
> When there are no errors in the system, I expect to be able to turn flink on 
> and off without reprocessing messages that successfully completed the stream 
> in a prior run.
> *My attempts to fix:*
> I've added the call to setStateBackend, thinking that perhaps the default 
> memory backend just didn't remember correctly. That didn't seem to help.
> I've removed the call to enableCheckpointing, hoping that perhaps there was a 
> separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
> help.
> I've used different sinks, RollingFileSink, print(); hoping that maybe the 
> bug was in kafka. That didn't seem to help.
> I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
> maybe the bug was in the latest version. That didn't seem to help.
> I've added the zookeeper.connect config to the properties object, hoping that 
> the comment about it only being useful in 0.8 was wrong. That didn't seem to 
> help.
> I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea 
> drfloob). That didn't seem to help.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4617) Kafka & Flink duplicate messages on restart

2016-09-14 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15489944#comment-15489944
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4617:


Hi Matthew,
>From your description, I'm assuming you're manually shutting down the job, and 
>then resubmitting it, correct?
Flink does not retain exactly-once across manual job restarts, unless you use 
savepoints 
(https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html).
The exactly-once guarantee refers to when the on job fails and the job 
automatically restores itself from previous checkpoints (when checkpointing is 
enabled, like what you did with {{env.enableCheckpointing(500)}} :) )

What is actually happening is that the Kafka consumer is simply start reading 
from existing offsets committed in ZK / Kafka when you manually resubmitted the 
job. These offsets were committed to ZK / Kafka the first time you executed the 
job. They however are not used for Flink's exactly-once semantics; Flink uses 
internally checkpointed Kafka offsets for that. The Kafka consumer commits 
those offsets back to ZK simply to expose a measure of progress of the job 
consumption to the outside world (wrt Flink).

> Kafka & Flink duplicate messages on restart
> ---
>
> Key: FLINK-4617
> URL: https://issues.apache.org/jira/browse/FLINK-4617
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2
> Environment: Ubuntu 16.04
> Flink 1.1.*
> Kafka 0.9.0.1
> Scala 2.11.7
> Java 1.8.0_91
>Reporter: Matthew Barlocker
>Priority: Critical
>
> [StackOverflow 
> Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart]
> Flink (the kafka connector) re-runs the last 3-9 messages it saw before it 
> was shut down.
> *My code:*
> {code}
> import java.util.Properties
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.CheckpointingMode
> import org.apache.flink.streaming.connectors.kafka._
> import org.apache.flink.streaming.util.serialization._
> import org.apache.flink.runtime.state.filesystem._
> object Runner {
>   def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(500)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
> 
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "testing");
> val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new 
> SimpleStringSchema(), properties)
> val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", 
> "testing-out", new SimpleStringSchema())
> env.addSource(kafkaConsumer)
>   .addSink(kafkaProducer)
> env.execute()
>   }
> }
> {code}
> *My sbt dependencies:*
> {code}
> libraryDependencies ++= Seq(
> "org.apache.flink" %% "flink-scala" % "1.1.2",
> "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
> "org.apache.flink" %% "flink-clients" % "1.1.2",
> "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
> "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
> )
> {code}
> *My process:*
> using 3 terminals:
> {code}
> TERM-1 start sbt, run program
> TERM-2 create kafka topics testing-in and testing-out
> TERM-2 run kafka-console-producer on testing-in topic
> TERM-3 run kafka-console-consumer on testing-out topic
> TERM-2 send data to kafka producer.
> Wait for a couple seconds (buffers need to flush)
> TERM-3 watch data appear in testing-out topic
> Wait for at least 500 milliseconds for checkpointing to happen
> TERM-1 stop sbt
> TERM-1 run sbt
> TERM-3 watch last few lines of data appear in testing-out topic
> {code}
> *My expectations:*
> When there are no errors in the system, I expect to be able to turn flink on 
> and off without reprocessing messages that successfully completed the stream 
> in a prior run.
> *My attempts to fix:*
> I've added the call to setStateBackend, thinking that perhaps the default 
> memory backend just didn't remember correctly. That didn't seem to help.
> I've removed the call to enableCheckpointing, hoping that perhaps there was a 
> separate mechanism to track state in Flink vs Zookeeper. That didn't seem to 
> help.
> I've used different sinks, RollingFileSink, print(); hoping that maybe the 
> bug was in kafka. That didn't seem to help.
> I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that 
> maybe the bug was in the latest