Hi Patrick,

Thank you for your reply and explanation. I can see the problem clearly.

That's right, we use the device ID as a partition key and it is a String
instead of Long. That's why it gives the serialization error.

So, what is your plan on this issue?
I'm building a proof of concept using StreamPipes. For now, I can still get
around this problem by removing the key. I hope this issue can be resolved.

Greetings,
Yudistira

Pada tanggal Kam, 11 Feb 2021 pukul 01.23 wiener <[email protected]>
menulis:

> Hi Yudistira,
>
> thanks for sharing your problem with us.
>
> Unfortunately, we currently use a Long deserializer as part of the Kafka
> adapter creation process [1]
>
> see following method:
>
> private static Consumer<Long, String> createConsumer(String broker, String
> topic) {
> ...
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> LongDeserializer.class.getName());
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class.getName());
> …
> }
>
> That would explain why you can’t deserialize your original topic with a
> partition key that can’t be deserialized into long.
> I assume, you have the device ID as a partition key? How does your key
> look like?
>
> In general, I guess this is an issue that we should address.
>
> Greetings,
> Patrick
>
>
> [1]
> https://github.com/apache/incubator-streampipes-extensions/blob/rel/0.67.0/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/KafkaProtocol.javaa
>
> Am 10.02.2021 um 06:28 schrieb Yudistira Hanifmuti <[email protected]>:
>
> Hello everyone,
>
>
> I have a problem with using the data stream connector to Kafka.
>
>
> I have a specific topic with multiple partitions in my Kafka broker. This
> topic contains IoT sensors readings from devices and I have to use the key
> in the topic to maintain the order.
>
>
> The problem is when I want to add a data stream in StreamPipes (ver
> 0.67.0) from my broker, it shows an error of SerializationException in
> schema reading.
>
>
> "org.apache.kafka.common.errors.SerializationException: Error
> deserializing key/value for partition ... "
>
>
> Then,  I tried to make a copy of this topic, except this time the key was
> not used. The result was it goes smoothly and the data can be read inside
> the pipeline editor.
>
>
> So, how are we supposed to use this stream connector? Could we use the
> record key in the Kafka broker? And if we couldn't, what is the
> alternative?
>
>
> Greetings,
> Yudistira
>
>
>

Reply via email to