Re: Problem when use kafka sink with EXACTLY_ONCE in IDEA

2019-01-02 Thread Till Rohrmann
Hi Kaibo,

which Kafka version are you running locally? When enabling exactly once
processing guarantees, you need at least Kafka >= 0.11. The
UnsupportedVersionException indicates that this constraint is not fulfilled
[1].

[1]
https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

Cheers,
Till

On Wed, Jan 2, 2019 at 5:02 AM Kaibo Zhou  wrote:

> Hi,
> I encountered an error while running the kafka sink demo in IDEA.
>
> This is the complete code:
>
> import java.util.Properties
>
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.runtime.state.filesystem.FsStateBackend
> import
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import
> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
>
> object kafka_test {
>
>   def main(args: Array[String]): Unit = {
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(1)
> env.setStateBackend(new FsStateBackend("file:///tmp/checkpoint"))
>
> val config = env.getCheckpointConfig
>
> config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> config.setCheckpointInterval(15 * 1000)
>
> val event = env.socketTextStream("localhost", )
> val propsTarget = new Properties()
> propsTarget.setProperty("bootstrap.servers", "127.0.0.1:9092")
> propsTarget.setProperty("enable.idempotence", "true")
>
> val outputProducer = new FlinkKafkaProducer011[String](
>   "test-output",
>   new KeyedSerializationSchemaWrapper[String](new
> SimpleStringSchema()),
>   propsTarget,
>   FlinkKafkaProducer011.Semantic.EXACTLY_ONCE // ok when change to
> Semantic.AT_LEAST_ONCE
> )
>
> event.addSink(outputProducer).name("sink_to_kafka")
> env.execute()
>   }
> }
>
> Start the command "nc -l " before running the above code.
> Error message:
>
> 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.11.0.2
> 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId :
> 73be1e1168f91ee2
> 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011  -
> Starting FlinkKafkaProducer (1/1) to produce into default topic test-output
> 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.kafka.clients.producer.internals.TransactionManager  -
> [TransactionalId Source: Socket Stream -> Sink:
> sink_to_kafka-7df19f87deec5680128845fd9a6ca18d-6] ProducerId set to -1 with
> epoch -1
> 7199 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka
> producer with timeoutMillis = 9223372036854775807 ms.
> 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task  - Source: Socket Stream -> Sink:
> sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) switched from
> RUNNING to FAILED.
> org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create
> a v0 FindCoordinator request because we require features supported only in
> 1 or later.
> 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
> Source: Socket Stream -> Sink: sink_to_kafka (1/1)
> (a7cea618f99152987bb4a52b4d1df0e3).
> 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
> org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
> streams are closed for task Source: Socket Stream -> Sink: sink_to_kafka
> (1/1) (a7cea618f99152987bb4a52b4d1df0e3) [FAILED]
> 7201 [flink-akka.actor.default-dispatcher-5] INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task
> and sending final execution state FAILED to JobManager for task Source:
> Socket Stream -> Sink: sink_to_kafka a7cea618f99152987bb4a52b4d1df0e3.
> 7201 [flink-akka.actor.default-dispatcher-5] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Socket
> Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3)
> switched from RUNNING to FAILED.
> org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create
> a v0 FindCoordinator request because we require features supported only in
> 1 or later.
> 7201 [flink-akka.actor.default-dispatcher-5] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink
> Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state
> RUNNING to FAILING.
> org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create
> a v0 FindCoordinator request because we require features supported only in
> 1 or later.
> 7202 

Problem when use kafka sink with EXACTLY_ONCE in IDEA

2019-01-01 Thread Kaibo Zhou
Hi,
I encountered an error while running the kafka sink demo in IDEA.

This is the complete code:

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper

object kafka_test {

  def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoint"))

val config = env.getCheckpointConfig

config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
config.setCheckpointInterval(15 * 1000)

val event = env.socketTextStream("localhost", )
val propsTarget = new Properties()
propsTarget.setProperty("bootstrap.servers", "127.0.0.1:9092")
propsTarget.setProperty("enable.idempotence", "true")

val outputProducer = new FlinkKafkaProducer011[String](
  "test-output",
  new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),
  propsTarget,
  FlinkKafkaProducer011.Semantic.EXACTLY_ONCE // ok when change to
Semantic.AT_LEAST_ONCE
)

event.addSink(outputProducer).name("sink_to_kafka")
env.execute()
  }
}

Start the command "nc -l " before running the above code.
Error message:

7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.kafka.common.utils.AppInfoParser  - Kafka version : 0.11.0.2
7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.kafka.common.utils.AppInfoParser  - Kafka commitId :
73be1e1168f91ee2
7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011  -
Starting FlinkKafkaProducer (1/1) to produce into default topic test-output
7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.kafka.clients.producer.internals.TransactionManager  -
[TransactionalId Source: Socket Stream -> Sink:
sink_to_kafka-7df19f87deec5680128845fd9a6ca18d-6] ProducerId set to -1 with
epoch -1
7199 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.kafka.clients.producer.KafkaProducer  - Closing the Kafka
producer with timeoutMillis = 9223372036854775807 ms.
7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.flink.runtime.taskmanager.Task  - Source: Socket Stream -> Sink:
sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) switched from
RUNNING to FAILED.
org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a
v0 FindCoordinator request because we require features supported only in 1
or later.
7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.flink.runtime.taskmanager.Task  - Freeing task resources for
Source: Socket Stream -> Sink: sink_to_kafka (1/1)
(a7cea618f99152987bb4a52b4d1df0e3).
7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO
org.apache.flink.runtime.taskmanager.Task  - Ensuring all FileSystem
streams are closed for task Source: Socket Stream -> Sink: sink_to_kafka
(1/1) (a7cea618f99152987bb4a52b4d1df0e3) [FAILED]
7201 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor  - Un-registering task
and sending final execution state FAILED to JobManager for task Source:
Socket Stream -> Sink: sink_to_kafka a7cea618f99152987bb4a52b4d1df0e3.
7201 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Socket
Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3)
switched from RUNNING to FAILED.
org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a
v0 FindCoordinator request because we require features supported only in 1
or later.
7201 [flink-akka.actor.default-dispatcher-5] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink
Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state
RUNNING to FAILING.
org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a
v0 FindCoordinator request because we require features supported only in 1
or later.
7202 [flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor  - Discarding the
results produced by task execution a7cea618f99152987bb4a52b4d1df0e3.
7202 [flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Try to restart or
fail the job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) if no
longer possible.
7202 [flink-akka.actor.default-dispatcher-2] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job Flink
Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state