Akshay Nagpal created FLINK-10039: ------------------------------------- Summary: FlinkKafkaProducer - Serializer Error Key: FLINK-10039 URL: https://issues.apache.org/jira/browse/FLINK-10039 Project: Flink Issue Type: Bug Components: Streaming Connectors Affects Versions: 1.4.2 Reporter: Akshay Nagpal
I am working on a use case where I input the data using Kafka's console producer, read the same data in my program using FlinkKafkaConsumer and write it back to another Kafka topic using FlinkKafkaProducer. I am using 1.4.2 version of the following dependencies: flink-java flink-streaming-java_2.11 flink-connector-kafka-0.10_2.11 The codes are as follows: KafkaConsoleProducer: {code:java} ./bin/kafka-console-producer --broker-list xxx:9092 --topic test1 --property "parse.key=true" --property "key.separator=:" --key-serializer org.apache.kafka.common.serialization.StringSerializer --value-serializer org.apache.kafka.common.serialization.StringSerializer {code} KafkaFlinkConsumer: {code:java} Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "xxx:9092"); properties.setProperty("zookeeper.connect", "xxx:2181"); properties.setProperty("group.id", "test"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>("test1", new SimpleStringSchema(), properties); DataStream<String> stream = env.addSource(myConsumer); {code} KafkaFlinkProducer: {code:java} Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "xxx:9092"); properties.setProperty("zookeeper.connect", "xxx:2181"); properties.setProperty("group.id", "test"); properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties1.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); FlinkKafkaProducer010<String> myProducer = new FlinkKafkaProducer010<String>("my-topic", new SimpleStringSchema(), properties); stream.addSink(myProducer); {code} When I specify key and value serializer as StringSerializer in FlinkKafkaProducer, it gives me the following error in the logs: {code:java} org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer {code} Though it's giving me this error, it's still producing the data in the topic. When I am using ByteArraySerializer though with the producer, it is not giving me the error in the logs. It is also giving me the output. Moreover, DataStream's print method is not printing the data on the console. -- This message was sent by Atlassian JIRA (v7.6.3#76005)