Exception while deserializing in kafka streams

2016-09-14 Thread Walter rakoff
Hello,

I get the below exception when deserilaizing avro records
using KafkaAvroDeserializer.

16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete
[StreamThread-1]
Exception in thread "StreamThread-1"
org.apache.kafka.common.errors.SerializationException:
Error deserializing Avro message for id 4
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
deserialize(AbstractKafkaAvroDeserializer.java:120)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
deserialize(AbstractKafkaAvroDeserializer.java:92)

I can confirm that schema registry URL is accessible and url/schemas/ids/4
does return valid schema.
May be some initialization didn't happen correctly?

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "
192.168.50.6:8081")
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.Long.getClass.getName)
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
GenericAvroSerdeWithSchemaRegistry])

GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/
y471k9nj94tlxro/avro_serde.txt?dl=0

Walter


Re: Exception while deserializing in kafka streams

2016-09-14 Thread Walter rakoff
Guozhang,

I am using 0.10.0.0. Could the below log be the cause?

16/09/14 17:24:35 WARN ConsumerConfig: The configuration
schema.registry.url = http://192.168.50.6: <http://192.168.50.6:8081/>8081
was supplied but isn't a known config.
16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0
16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13

Walter


On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Walter,
>
> Which version of Kafka were you using?
>
> I ask this because there was a bug causing the serde passed through config
> to NOT being configured when constructed:
> https://issues.apache.org/jira/browse/KAFKA-3639
>
>
> Which is fixed in the 0.10.0.0 release, which means you will only hit it if
> you are using the tech-preview release version.
>
>
> Guozhang
>
>
> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff <walter.rak...@gmail.com>
> wrote:
>
> > Hello,
> >
> > I get the below exception when deserilaizing avro records
> > using KafkaAvroDeserializer.
> >
> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete
> > [StreamThread-1]
> > Exception in thread "StreamThread-1"
> > org.apache.kafka.common.errors.SerializationException:
> > Error deserializing Avro message for id 4
> > Caused by: java.lang.NullPointerException
> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> > deserialize(AbstractKafkaAvroDeserializer.java:120)
> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> > deserialize(AbstractKafkaAvroDeserializer.java:92)
> >
> > I can confirm that schema registry URL is accessible and
> url/schemas/ids/4
> > does return valid schema.
> > May be some initialization didn't happen correctly?
> >
> > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "
> > 192.168.50.6:8081")
> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.Long.getClass.getName)
> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
> > GenericAvroSerdeWithSchemaRegistry])
> >
> > GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/
> > y471k9nj94tlxro/avro_serde.txt?dl=0
> >
> > Walter
> >
>
>
>
> --
> -- Guozhang
>


Re: Exception while deserializing in kafka streams

2016-09-16 Thread Walter rakoff
Guozhang,

Any clues on this one?

Walter

On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff <walter.rak...@gmail.com>
wrote:

> Guozhang,
>
> I am using 0.10.0.0. Could the below log be the cause?
>
> 16/09/14 17:24:35 WARN ConsumerConfig: The configuration
> schema.registry.url = http://192.168.50.6: <http://192.168.50.6:8081/>8081
> was supplied but isn't a known config.
> 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0
> 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
>
> Walter
>
>
> On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hello Walter,
>>
>> Which version of Kafka were you using?
>>
>> I ask this because there was a bug causing the serde passed through config
>> to NOT being configured when constructed:
>> https://issues.apache.org/jira/browse/KAFKA-3639
>>
>>
>> Which is fixed in the 0.10.0.0 release, which means you will only hit it
>> if
>> you are using the tech-preview release version.
>>
>>
>> Guozhang
>>
>>
>> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff <walter.rak...@gmail.com>
>> wrote:
>>
>> > Hello,
>> >
>> > I get the below exception when deserilaizing avro records
>> > using KafkaAvroDeserializer.
>> >
>> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete
>> > [StreamThread-1]
>> > Exception in thread "StreamThread-1"
>> > org.apache.kafka.common.errors.SerializationException:
>> > Error deserializing Avro message for id 4
>> > Caused by: java.lang.NullPointerException
>> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
>> .
>> > deserialize(AbstractKafkaAvroDeserializer.java:120)
>> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
>> .
>> > deserialize(AbstractKafkaAvroDeserializer.java:92)
>> >
>> > I can confirm that schema registry URL is accessible and
>> url/schemas/ids/4
>> > does return valid schema.
>> > May be some initialization didn't happen correctly?
>> >
>> > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> "
>> > 192.168.50.6:8081")
>> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > Serdes.Long.getClass.getName)
>> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
>> > GenericAvroSerdeWithSchemaRegistry])
>> >
>> > GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/
>> > y471k9nj94tlxro/avro_serde.txt?dl=0
>> >
>> > Walter
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


Re: Exception while deserializing in kafka streams

2016-09-22 Thread Walter rakoff
Guozhang,

I tried your suggestion. Below is the log from Serde, Serializer
& Deserializer.
Confirmed that KafkaAvroDeserializer.configure does get invoked.

Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In
> configure {num.standby.replicas=1, replication.factor=3,
> commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> schema.registry.url=http://10.200.184.41:8081,
> key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> zookeeper.connect=10.200.184.26:2181, value.serde=class
> kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> auto.offset.reset=earliest, num.stream.threads=1, client.id=KStreams-Test,
> application.id=testing-app}
> Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchemaRegistry:
> In configure{num.standby.replicas=1, replication.factor=3,
> commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> schema.registry.url=http://10.200.184.41:8081,
> key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> zookeeper.connect=10.200.184.26:2181, value.serde=class
> kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> auto.offset.reset=earliest, num.stream.threads=1, client.id=KStreams-Test,
> application.id=testing-app}
> Line 385: 16/09/22 15:28:46 WARN
> GenericAvroDeserializerWithSchemaRegistry: In
> configure{num.standby.replicas=1, replication.factor=3, 
> commit.interval.ms=125000,
> bootstrap.servers=10.200.184.29:9092, schema.registry.url=
> http://10.200.184.41:8081,
> key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> zookeeper.connect=10.200.184.26:2181, value.serde=class
> kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> auto.offset.reset=earliest, num.stream.threads=1, client.id=KStreams-Test,
> application.id=testing-app}


Still the same exception

16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete
> [StreamThread-1]
> Exception in thread "StreamThread-1"
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> Avro message for id 7
> Caused by: java.lang.NullPointerException
> at
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:120)
> at
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)



Walter

On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Walter,
>
> The WARN log entry should not be the cause of this issue.
>
> I double checked the 0.10.0.0 release and this issue should not really
> happen, so your observation is a bit weird to me. Could your add a log
> entry in the `configure` function which constructs the registry client to
> make sure it is indeed triggered when the streams app start up?
>
>
> Guozhang
>
>
>
> On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff <walter.rak...@gmail.com>
> wrote:
>
> > Guozhang,
> >
> > Any clues on this one?
> >
> > Walter
> >
> > On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff <walter.rak...@gmail.com>
> > wrote:
> >
> > > Guozhang,
> > >
> > > I am using 0.10.0.0. Could the below log be the cause?
> > >
> > > 16/09/14 17:24:35 WARN ConsumerConfig: The configuration
> > > schema.registry.url = http://192.168.50.6: <http://192.168.50.6:8081/>
> > 8081
> > > was supplied but isn't a known config.
> > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0
> > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
> > >
> > > Walter
> > >
> > >
> > > On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > >> Hello Walter,
> > >>
> > >> Which version of Kafka were you using?
> > >>
> > >> I ask this because there was a bug causing the serde passed through
> > config
> > >> to NOT being configured when constructed:
> > >> https://issues.apache.org/jira/browse/KAFKA-3639
> > >>
> > >>
> > >> Which is fixed in the 0.10.0.0 release, which means you will only hit
> it
> > >> if
> > >> you are using the tech-preview release version.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff <
> > walter.rak...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hello,
> > >> >
> > >> > I get the below exception when deserilaizing avro records
>

Re: Exception while deserializing in kafka streams

2016-09-27 Thread Walter rakoff
Ah, that was it. I was passing the same Serde while creating the topology.
It works after I removed it.

Thanks!

Walter

On Mon, Sep 26, 2016 at 1:16 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Walter,
>
> One thing I can think of is that, if you pass the serde object as part of
> your topology definition, instead of passing the serde class in the config,
> then these serde objects will not be auto configured and hence for your
> case the schema registry client will not be constructed and initialized.
>
> https://issues.apache.org/jira/browse/KAFKA-3729
>
> So in case your application's topology does overwrite serdes with direct
> serde object passing, you need to configure them manually for now.
>
>
> Guozhang
>
> On Thu, Sep 22, 2016 at 5:36 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Walter,
> >
> > I downloaded the 0.10.0 jar and verified that the configure() function is
> > auto-triggered when you get the serde classes from `context.keySerde /
> > valueSerde`, which is auto-triggered if you use the DSL. And your Scala
> > code is the same as to our examples code:
> >
> > https://github.com/confluentinc/examples/blob/
> > 030343f9acbb9f73a13ab0f0fd31690dca97c606/kafka-streams/src/
> > main/java/io/confluent/examples/streams/utils/GenericAvroSerializer.java
> >
> >
> > Which demo example were you running? And are there any other jars
> > co-located with the 0.10.0.0 jar that could cause another class be
> loaded?
> >
> >
> > Guozhang
> >
> >
> > On Thu, Sep 22, 2016 at 8:41 AM, Walter rakoff <walter.rak...@gmail.com>
> > wrote:
> >
> >> Guozhang,
> >>
> >> I tried your suggestion. Below is the log from Serde, Serializer
> >> & Deserializer.
> >> Confirmed that KafkaAvroDeserializer.configure does get invoked.
> >>
> >> Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In
> >> > configure {num.standby.replicas=1, replication.factor=3,
> >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> >> > schema.registry.url=http://10.200.184.41:8081,
> >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> >> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> >> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> >> =KStreams-Test,
> >> > application.id=testing-app}
> >> > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchem
> >> aRegistry:
> >> > In configure{num.standby.replicas=1, replication.factor=3,
> >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> >> > schema.registry.url=http://10.200.184.41:8081,
> >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> >> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> >> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> >> =KStreams-Test,
> >> > application.id=testing-app}
> >> > Line 385: 16/09/22 15:28:46 WARN
> >> > GenericAvroDeserializerWithSchemaRegistry: In
> >> > configure{num.standby.replicas=1, replication.factor=3,
> >> commit.interval.ms=125000,
> >> > bootstrap.servers=10.200.184.29:9092, schema.registry.url=
> >> > http://10.200.184.41:8081,
> >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> >> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> >> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> >> =KStreams-Test,
> >> > application.id=testing-app}
> >>
> >>
> >> Still the same exception
> >>
> >> 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete
> >> > [StreamThread-1]
> >> > Exception in thread "StreamThread-1"
> >> > org.apache.kafka.common.errors.SerializationException: Error
> >> deserializing
> >> > Avro message for id 7
> >> > Caused by: java.lang.NullPointerException
> >> > at
> >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
> >> .deserialize(AbstractKafkaAvroDeserializer.java:120)
> >> > at
> >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
> >> .d