Hi,

Currently I’d suggest you create an issue for the that on our Jira [1] so we 
can track it.

This would be some minor issue that could be resolved quiet fast by simply 
changing
the key deserializer to the String one in the Class mentioned in the mail 
before.

Also, we would really appreciate if you could provide us with a pull request to 
resolve that issue 
if you need it to be quickly resolved.

Greetings,
Patrick


[1] 
https://issues.apache.org/jira/projects/STREAMPIPES/issues/STREAMPIPES-284?filter=allopenissues
 
<https://issues.apache.org/jira/projects/STREAMPIPES/issues/STREAMPIPES-284?filter=allopenissues>
 

> Am 11.02.2021 um 06:13 schrieb Yudistira Hanifmuti <[email protected]>:
> 
> Hi Patrick,
> 
> Thank you for your reply and explanation. I can see the problem clearly.
> 
> That's right, we use the device ID as a partition key and it is a String 
> instead of Long. That's why it gives the serialization error.
> 
> So, what is your plan on this issue?
> I'm building a proof of concept using StreamPipes. For now, I can still get 
> around this problem by removing the key. I hope this issue can be resolved.
> 
> Greetings,
> Yudistira
> 
> Pada tanggal Kam, 11 Feb 2021 pukul 01.23 wiener <[email protected] 
> <mailto:[email protected]>> menulis:
> Hi Yudistira,
> 
> thanks for sharing your problem with us.
> 
> Unfortunately, we currently use a Long deserializer as part of the Kafka 
> adapter creation process [1]
> 
> see following method:
> 
> private static Consumer<Long, String> createConsumer(String broker, String 
> topic) {
> ...
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> LongDeserializer.class.getName());
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
> …
> }
> 
> That would explain why you can’t deserialize your original topic with a 
> partition key that can’t be deserialized into long.
> I assume, you have the device ID as a partition key? How does your key look 
> like?
> 
> In general, I guess this is an issue that we should address.
> 
> Greetings,
> Patrick
> 
> 
> [1] 
> https://github.com/apache/incubator-streampipes-extensions/blob/rel/0.67.0/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/KafkaProtocol.javaa
>  
> <https://github.com/apache/incubator-streampipes-extensions/blob/rel/0.67.0/streampipes-connect-adapters/src/main/java/org/apache/streampipes/connect/protocol/stream/KafkaProtocol.javaa>
> 
>> Am 10.02.2021 um 06:28 schrieb Yudistira Hanifmuti <[email protected] 
>> <mailto:[email protected]>>:
>> 
>> Hello everyone,
>>  
>> I have a problem with using the data stream connector to Kafka.
>>  
>> I have a specific topic with multiple partitions in my Kafka broker. This 
>> topic contains IoT sensors readings from devices and I have to use the key 
>> in the topic to maintain the order.
>>  
>> The problem is when I want to add a data stream in StreamPipes (ver 0.67.0) 
>> from my broker, it shows an error of SerializationException in schema 
>> reading.
>>  
>> "org.apache.kafka.common.errors.SerializationException: Error deserializing 
>> key/value for partition ... "
>>  
>> Then,  I tried to make a copy of this topic, except this time the key was 
>> not used. The result was it goes smoothly and the data can be read inside 
>> the pipeline editor.
>>  
>> So, how are we supposed to use this stream connector? Could we use the 
>> record key in the Kafka broker? And if we couldn't, what is the alternative?
>>  
>> Greetings,
>> Yudistira
> 

Reply via email to