Hi Yudistira,
thanks for sharing your problem with us.
Unfortunately, we currently use a Long deserializer as part of the Kafka
adapter creation process [1]
see following method:
private static Consumer<Long, String> createConsumer(String broker, String
topic) {
...
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
…
}
That would explain why you can’t deserialize your original topic with a
partition key that can’t be deserialized into long.
I assume, you have the device ID as a partition key? How does your key look
like?
In general, I guess this is an issue that we should address.
Greetings,
Patrick
[1]
https://github.com/apache/incubator-streampipes-extensions/blob/rel/0.67.0/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/KafkaProtocol.javaa
> Am 10.02.2021 um 06:28 schrieb Yudistira Hanifmuti <[email protected]>:
>
> Hello everyone,
>
> I have a problem with using the data stream connector to Kafka.
>
> I have a specific topic with multiple partitions in my Kafka broker. This
> topic contains IoT sensors readings from devices and I have to use the key in
> the topic to maintain the order.
>
> The problem is when I want to add a data stream in StreamPipes (ver 0.67.0)
> from my broker, it shows an error of SerializationException in schema reading.
>
> "org.apache.kafka.common.errors.SerializationException: Error deserializing
> key/value for partition ... "
>
> Then, I tried to make a copy of this topic, except this time the key was not
> used. The result was it goes smoothly and the data can be read inside the
> pipeline editor.
>
> So, how are we supposed to use this stream connector? Could we use the record
> key in the Kafka broker? And if we couldn't, what is the alternative?
>
> Greetings,
> Yudistira