Hi Yudistira,

thanks for sharing your problem with us.

Unfortunately, we currently use a Long deserializer as part of the Kafka 
adapter creation process [1]

see following method:

private static Consumer<Long, String> createConsumer(String broker, String 
topic) {
...
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
…
}

That would explain why you can’t deserialize your original topic with a 
partition key that can’t be deserialized into long.
I assume, you have the device ID as a partition key? How does your key look 
like?

In general, I guess this is an issue that we should address.

Greetings,
Patrick


[1] 
https://github.com/apache/incubator-streampipes-extensions/blob/rel/0.67.0/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/KafkaProtocol.javaa

> Am 10.02.2021 um 06:28 schrieb Yudistira Hanifmuti <[email protected]>:
> 
> Hello everyone,
>  
> I have a problem with using the data stream connector to Kafka.
>  
> I have a specific topic with multiple partitions in my Kafka broker. This 
> topic contains IoT sensors readings from devices and I have to use the key in 
> the topic to maintain the order.
>  
> The problem is when I want to add a data stream in StreamPipes (ver 0.67.0) 
> from my broker, it shows an error of SerializationException in schema reading.
>  
> "org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition ... "
>  
> Then,  I tried to make a copy of this topic, except this time the key was not 
> used. The result was it goes smoothly and the data can be read inside the 
> pipeline editor.
>  
> So, how are we supposed to use this stream connector? Could we use the record 
> key in the Kafka broker? And if we couldn't, what is the alternative?
>  
> Greetings,
> Yudistira

Reply via email to