Hi Patrick, Thank you for your reply and explanation. I can see the problem clearly.
That's right, we use the device ID as a partition key and it is a String instead of Long. That's why it gives the serialization error. So, what is your plan on this issue? I'm building a proof of concept using StreamPipes. For now, I can still get around this problem by removing the key. I hope this issue can be resolved. Greetings, Yudistira Pada tanggal Kam, 11 Feb 2021 pukul 01.23 wiener <[email protected]> menulis: > Hi Yudistira, > > thanks for sharing your problem with us. > > Unfortunately, we currently use a Long deserializer as part of the Kafka > adapter creation process [1] > > see following method: > > private static Consumer<Long, String> createConsumer(String broker, String > topic) { > ... > props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > LongDeserializer.class.getName()); > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > … > } > > That would explain why you can’t deserialize your original topic with a > partition key that can’t be deserialized into long. > I assume, you have the device ID as a partition key? How does your key > look like? > > In general, I guess this is an issue that we should address. > > Greetings, > Patrick > > > [1] > https://github.com/apache/incubator-streampipes-extensions/blob/rel/0.67.0/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/KafkaProtocol.javaa > > Am 10.02.2021 um 06:28 schrieb Yudistira Hanifmuti <[email protected]>: > > Hello everyone, > > > I have a problem with using the data stream connector to Kafka. > > > I have a specific topic with multiple partitions in my Kafka broker. This > topic contains IoT sensors readings from devices and I have to use the key > in the topic to maintain the order. > > > The problem is when I want to add a data stream in StreamPipes (ver > 0.67.0) from my broker, it shows an error of SerializationException in > schema reading. > > > "org.apache.kafka.common.errors.SerializationException: Error > deserializing key/value for partition ... " > > > Then, I tried to make a copy of this topic, except this time the key was > not used. The result was it goes smoothly and the data can be read inside > the pipeline editor. > > > So, how are we supposed to use this stream connector? Could we use the > record key in the Kafka broker? And if we couldn't, what is the > alternative? > > > Greetings, > Yudistira > > >
