Akshay Nagpal created FLINK-10039:
-------------------------------------

             Summary: FlinkKafkaProducer - Serializer Error
                 Key: FLINK-10039
                 URL: https://issues.apache.org/jira/browse/FLINK-10039
             Project: Flink
          Issue Type: Bug
          Components: Streaming Connectors
    Affects Versions: 1.4.2
            Reporter: Akshay Nagpal


I am working on a use case where I input the data using Kafka's console 
producer, read the same data in my program using FlinkKafkaConsumer and write 
it back to another Kafka topic using FlinkKafkaProducer. 

I am using 1.4.2 version of the following dependencies:

flink-java

flink-streaming-java_2.11

flink-connector-kafka-0.10_2.11

 

The codes are as follows:

KafkaConsoleProducer:
{code:java}
./bin/kafka-console-producer --broker-list xxx:9092 --topic test1 --property 
"parse.key=true" --property "key.separator=:" --key-serializer 
org.apache.kafka.common.serialization.StringSerializer --value-serializer 
org.apache.kafka.common.serialization.StringSerializer
{code}
KafkaFlinkConsumer:
{code:java}
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxx:9092");
properties.setProperty("zookeeper.connect", "xxx:2181");
properties.setProperty("group.id", "test");
properties.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");


FlinkKafkaConsumer010<String> myConsumer = new 
FlinkKafkaConsumer010<String>("test1", 
new SimpleStringSchema(),
properties);

DataStream<String> stream = env.addSource(myConsumer);
{code}
KafkaFlinkProducer:
{code:java}
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxx:9092");
properties.setProperty("zookeeper.connect", "xxx:2181");
properties.setProperty("group.id", "test");
properties.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
properties1.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");


FlinkKafkaProducer010<String> myProducer = new 
FlinkKafkaProducer010<String>("my-topic", 
new SimpleStringSchema(), 
properties);

stream.addSink(myProducer);
{code}
When I specify key and value serializer as StringSerializer in 
FlinkKafkaProducer, it gives me the following error in the logs:

 
{code:java}
org.apache.kafka.common.errors.SerializationException: Can't convert value of 
class [B to class org.apache.kafka.common.serialization.StringSerializer 
specified in value.serializer
{code}
Though it's giving me this error, it's still producing the data in the topic.

When I am using ByteArraySerializer though with the producer, it is not giving 
me the error in the logs. It is also giving me the output.

Moreover, DataStream's print method is not printing the data on the console.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to