Re: Problem when use kafka sink with EXACTLY_ONCE in IDEA
Hi Kaibo, which Kafka version are you running locally? When enabling exactly once processing guarantees, you need at least Kafka >= 0.11. The UnsupportedVersionException indicates that this constraint is not fulfilled [1]. [1] https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html Cheers, Till On Wed, Jan 2, 2019 at 5:02 AM Kaibo Zhou wrote: > Hi, > I encountered an error while running the kafka sink demo in IDEA. > > This is the complete code: > > import java.util.Properties > > import org.apache.flink.api.common.serialization.SimpleStringSchema > import org.apache.flink.runtime.state.filesystem.FsStateBackend > import > org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import > org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper > > object kafka_test { > > def main(args: Array[String]): Unit = { > > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setParallelism(1) > env.setStateBackend(new FsStateBackend("file:///tmp/checkpoint")) > > val config = env.getCheckpointConfig > > config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) > config.setCheckpointInterval(15 * 1000) > > val event = env.socketTextStream("localhost", ) > val propsTarget = new Properties() > propsTarget.setProperty("bootstrap.servers", "127.0.0.1:9092") > propsTarget.setProperty("enable.idempotence", "true") > > val outputProducer = new FlinkKafkaProducer011[String]( > "test-output", > new KeyedSerializationSchemaWrapper[String](new > SimpleStringSchema()), > propsTarget, > FlinkKafkaProducer011.Semantic.EXACTLY_ONCE // ok when change to > Semantic.AT_LEAST_ONCE > ) > > event.addSink(outputProducer).name("sink_to_kafka") > env.execute() > } > } > > Start the command "nc -l " before running the above code. > Error message: > > 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO > org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.11.0.2 > 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO > org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : > 73be1e1168f91ee2 > 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 - > Starting FlinkKafkaProducer (1/1) to produce into default topic test-output > 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO > org.apache.kafka.clients.producer.internals.TransactionManager - > [TransactionalId Source: Socket Stream -> Sink: > sink_to_kafka-7df19f87deec5680128845fd9a6ca18d-6] ProducerId set to -1 with > epoch -1 > 7199 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO > org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka > producer with timeoutMillis = 9223372036854775807 ms. > 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Source: Socket Stream -> Sink: > sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) switched from > RUNNING to FAILED. > org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create > a v0 FindCoordinator request because we require features supported only in > 1 or later. > 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Freeing task resources for > Source: Socket Stream -> Sink: sink_to_kafka (1/1) > (a7cea618f99152987bb4a52b4d1df0e3). > 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO > org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem > streams are closed for task Source: Socket Stream -> Sink: sink_to_kafka > (1/1) (a7cea618f99152987bb4a52b4d1df0e3) [FAILED] > 7201 [flink-akka.actor.default-dispatcher-5] INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task > and sending final execution state FAILED to JobManager for task Source: > Socket Stream -> Sink: sink_to_kafka a7cea618f99152987bb4a52b4d1df0e3. > 7201 [flink-akka.actor.default-dispatcher-5] INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Socket > Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) > switched from RUNNING to FAILED. > org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create > a v0 FindCoordinator request because we require features supported only in > 1 or later. > 7201 [flink-akka.actor.default-dispatcher-5] INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink > Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state > RUNNING to FAILING. > org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create > a v0 FindCoordinator request because we require features supported only in > 1 or later. > 7202
Problem when use kafka sink with EXACTLY_ONCE in IDEA
Hi, I encountered an error while running the kafka sink demo in IDEA. This is the complete code: import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper object kafka_test { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStateBackend(new FsStateBackend("file:///tmp/checkpoint")) val config = env.getCheckpointConfig config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) config.setCheckpointInterval(15 * 1000) val event = env.socketTextStream("localhost", ) val propsTarget = new Properties() propsTarget.setProperty("bootstrap.servers", "127.0.0.1:9092") propsTarget.setProperty("enable.idempotence", "true") val outputProducer = new FlinkKafkaProducer011[String]( "test-output", new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), propsTarget, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE // ok when change to Semantic.AT_LEAST_ONCE ) event.addSink(outputProducer).name("sink_to_kafka") env.execute() } } Start the command "nc -l " before running the above code. Error message: 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.11.0.2 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 73be1e1168f91ee2 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 - Starting FlinkKafkaProducer (1/1) to produce into default topic test-output 7186 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId Source: Socket Stream -> Sink: sink_to_kafka-7df19f87deec5680128845fd9a6ca18d-6] ProducerId set to -1 with epoch -1 7199 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) switched from RUNNING to FAILED. org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later. 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3). 7200 [Source: Socket Stream -> Sink: sink_to_kafka (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) [FAILED] 7201 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Socket Stream -> Sink: sink_to_kafka a7cea618f99152987bb4a52b4d1df0e3. 7201 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Socket Stream -> Sink: sink_to_kafka (1/1) (a7cea618f99152987bb4a52b4d1df0e3) switched from RUNNING to FAILED. org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later. 7201 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state RUNNING to FAILING. org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later. 7202 [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Discarding the results produced by task execution a7cea618f99152987bb4a52b4d1df0e3. 7202 [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to restart or fail the job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) if no longer possible. 7202 [flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (e7da02d2a2ed7cd2d215e244b582b4ef) switched from state