[jira] [Commented] (FLINK-22190) no guarantee on Flink exactly_once sink to Kafka

2021-05-19 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17347469#comment-17347469
 ] 

Spongebob commented on FLINK-22190:
---

my mistake lead to this issus. But I found that I must set group.id when source 
from kafka and enable checkpoints other else I would got an 
org.apache.kafka.common.errors.InvalidGroupIdException that would end the sink 
tasks.

> no guarantee on Flink exactly_once sink to Kafka 
> -
>
> Key: FLINK-22190
> URL: https://issues.apache.org/jira/browse/FLINK-22190
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.2
> Environment: *flink: 1.12.2*
> *kafka: 2.7.0*
>Reporter: Spongebob
>Priority: Major
>
> When I tried to test the function of flink exactly_once sink to kafka, I 
> found it can not run as expectation.  here's the pipline of the flink 
> applications: raw data(flink app0)-> kafka topic1 -> flink app1 -> kafka 
> topic2 -> flink app2, flink tasks may met / byZeroException in random. Below 
> shows the codes:
> {code:java}
> //代码占位符
> raw data, flink app0:
> class SimpleSource1 extends SourceFunction[String] {
>  var switch = true
>  val students: Array[String] = Array("Tom", "Jerry", "Gory")
>  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit 
> = {
>  var i = 0
>  while (switch) {
>  sourceContext.collect(s"${students(Random.nextInt(students.length))},$i")
>  i += 1
>  Thread.sleep(5000)
>  }
>  }
>  override def cancel(): Unit = switch = false
> }
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val dataStream = streamEnv.addSource(new SimpleSource1)
> dataStream.addSink(new FlinkKafkaProducer[String]("xfy:9092", 
> "single-partition-topic-2", new SimpleStringSchema()))
> streamEnv.execute("sink kafka")
>  
> flink-app1:
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
> val prop = new Properties()
> prop.setProperty("bootstrap.servers", "xfy:9092")
> prop.setProperty("group.id", "test")
> val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
>  "single-partition-topic-2",
>  new SimpleStringSchema,
>  prop
> ))
> val resultStream = dataStream.map(x => {
>  val data = x.split(",")
>  (data(0), data(1), data(1).toInt / Random.nextInt(5)).toString()
> }
> )
> resultStream.print().setParallelism(1)
> val propProducer = new Properties()
> propProducer.setProperty("bootstrap.servers", "xfy:9092")
> propProducer.setProperty("transaction.timeout.ms", s"${1000 * 60 * 5}")
> resultStream.addSink(new FlinkKafkaProducer[String](
>  "single-partition-topic",
>  new MyKafkaSerializationSchema("single-partition-topic"),
>  propProducer,
>  Semantic.EXACTLY_ONCE))
> streamEnv.execute("sink kafka")
>  
> flink-app2:
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val prop = new Properties()
> prop.setProperty("bootstrap.servers", "xfy:9092")
> prop.setProperty("group.id", "test")
> prop.setProperty("isolation_level", "read_committed")
> val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
>  "single-partition-topic",
>  new SimpleStringSchema,
>  prop
> ))
> dataStream.print().setParallelism(1)
> streamEnv.execute("consumer kafka"){code}
>  
> flink app1 will print some duplicate numbers, and to my expectation flink 
> app2 will deduplicate them but the fact shows not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22190) no guarantee on Flink exactly_once sink to Kafka

2021-04-12 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319910#comment-17319910
 ] 

Arvid Heise commented on FLINK-22190:
-

1. You get byZeroException because you are dividing by 0 in user code {{/ 
Random.nextInt(5)}}. That's something that you need to fix on your end.
2. Could you provide example output to show the duplicates? Where does the 
fail-over happen?

Note that exactly once does not mean deduplication of records or parts thereof. 
Exactly once ensures that there are no duplicates caused by fail-over/restarts.

> no guarantee on Flink exactly_once sink to Kafka 
> -
>
> Key: FLINK-22190
> URL: https://issues.apache.org/jira/browse/FLINK-22190
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.12.2
> Environment: *flink: 1.12.2*
> *kafka: 2.7.0*
>Reporter: Spongebob
>Priority: Major
>
> When I tried to test the function of flink exactly_once sink to kafka, I 
> found it can not run as expectation.  here's the pipline of the flink 
> applications: raw data(flink app0)-> kafka topic1 -> flink app1 -> kafka 
> topic2 -> flink app2, flink tasks may met / byZeroException in random. Below 
> shows the codes:
> {code:java}
> //代码占位符
> raw data, flink app0:
> class SimpleSource1 extends SourceFunction[String] {
>  var switch = true
>  val students: Array[String] = Array("Tom", "Jerry", "Gory")
>  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit 
> = {
>  var i = 0
>  while (switch) {
>  sourceContext.collect(s"${students(Random.nextInt(students.length))},$i")
>  i += 1
>  Thread.sleep(5000)
>  }
>  }
>  override def cancel(): Unit = switch = false
> }
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val dataStream = streamEnv.addSource(new SimpleSource1)
> dataStream.addSink(new FlinkKafkaProducer[String]("xfy:9092", 
> "single-partition-topic-2", new SimpleStringSchema()))
> streamEnv.execute("sink kafka")
>  
> flink-app1:
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
> val prop = new Properties()
> prop.setProperty("bootstrap.servers", "xfy:9092")
> prop.setProperty("group.id", "test")
> val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
>  "single-partition-topic-2",
>  new SimpleStringSchema,
>  prop
> ))
> val resultStream = dataStream.map(x => {
>  val data = x.split(",")
>  (data(0), data(1), data(1).toInt / Random.nextInt(5)).toString()
> }
> )
> resultStream.print().setParallelism(1)
> val propProducer = new Properties()
> propProducer.setProperty("bootstrap.servers", "xfy:9092")
> propProducer.setProperty("transaction.timeout.ms", s"${1000 * 60 * 5}")
> resultStream.addSink(new FlinkKafkaProducer[String](
>  "single-partition-topic",
>  new MyKafkaSerializationSchema("single-partition-topic"),
>  propProducer,
>  Semantic.EXACTLY_ONCE))
> streamEnv.execute("sink kafka")
>  
> flink-app2:
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val prop = new Properties()
> prop.setProperty("bootstrap.servers", "xfy:9092")
> prop.setProperty("group.id", "test")
> prop.setProperty("isolation_level", "read_committed")
> val dataStream = streamEnv.addSource(new FlinkKafkaConsumer[String](
>  "single-partition-topic",
>  new SimpleStringSchema,
>  prop
> ))
> dataStream.print().setParallelism(1)
> streamEnv.execute("consumer kafka"){code}
>  
> flink app1 will print some duplicate numbers, and to my expectation flink 
> app2 will deduplicate them but the fact shows not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)