Re: Exception while deserializing in kafka streams

2016-09-27 Thread Walter rakoff
Ah, that was it. I was passing the same Serde while creating the topology.
It works after I removed it.

Thanks!

Walter

On Mon, Sep 26, 2016 at 1:16 PM, Guozhang Wang  wrote:

> Hi Walter,
>
> One thing I can think of is that, if you pass the serde object as part of
> your topology definition, instead of passing the serde class in the config,
> then these serde objects will not be auto configured and hence for your
> case the schema registry client will not be constructed and initialized.
>
> https://issues.apache.org/jira/browse/KAFKA-3729
>
> So in case your application's topology does overwrite serdes with direct
> serde object passing, you need to configure them manually for now.
>
>
> Guozhang
>
> On Thu, Sep 22, 2016 at 5:36 PM, Guozhang Wang  wrote:
>
> > Hi Walter,
> >
> > I downloaded the 0.10.0 jar and verified that the configure() function is
> > auto-triggered when you get the serde classes from `context.keySerde /
> > valueSerde`, which is auto-triggered if you use the DSL. And your Scala
> > code is the same as to our examples code:
> >
> > https://github.com/confluentinc/examples/blob/
> > 030343f9acbb9f73a13ab0f0fd31690dca97c606/kafka-streams/src/
> > main/java/io/confluent/examples/streams/utils/GenericAvroSerializer.java
> >
> >
> > Which demo example were you running? And are there any other jars
> > co-located with the 0.10.0.0 jar that could cause another class be
> loaded?
> >
> >
> > Guozhang
> >
> >
> > On Thu, Sep 22, 2016 at 8:41 AM, Walter rakoff 
> > wrote:
> >
> >> Guozhang,
> >>
> >> I tried your suggestion. Below is the log from Serde, Serializer
> >> & Deserializer.
> >> Confirmed that KafkaAvroDeserializer.configure does get invoked.
> >>
> >> Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In
> >> > configure {num.standby.replicas=1, replication.factor=3,
> >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> >> > schema.registry.url=http://10.200.184.41:8081,
> >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> >> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> >> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> >> =KStreams-Test,
> >> > application.id=testing-app}
> >> > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchem
> >> aRegistry:
> >> > In configure{num.standby.replicas=1, replication.factor=3,
> >> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> >> > schema.registry.url=http://10.200.184.41:8081,
> >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> >> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> >> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> >> =KStreams-Test,
> >> > application.id=testing-app}
> >> > Line 385: 16/09/22 15:28:46 WARN
> >> > GenericAvroDeserializerWithSchemaRegistry: In
> >> > configure{num.standby.replicas=1, replication.factor=3,
> >> commit.interval.ms=125000,
> >> > bootstrap.servers=10.200.184.29:9092, schema.registry.url=
> >> > http://10.200.184.41:8081,
> >> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> >> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> >> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> >> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> >> =KStreams-Test,
> >> > application.id=testing-app}
> >>
> >>
> >> Still the same exception
> >>
> >> 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete
> >> > [StreamThread-1]
> >> > Exception in thread "StreamThread-1"
> >> > org.apache.kafka.common.errors.SerializationException: Error
> >> deserializing
> >> > Avro message for id 7
> >> > Caused by: java.lang.NullPointerException
> >> > at
> >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
> >> .deserialize(AbstractKafkaAvroDeserializer.java:120)
> >> > at
> >> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
> >> .deserialize(AbstractKafkaAvroDeserializer.java:92)
> >>
> >>
> >>
> >> Walter
> >>
> >> On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang 
> >> wrote:
> >>
> >> > Hello Walter,
> >> >
> >> > The WARN log entry should not be the cause of this issue.
> >> >
> >> > I double checked the 0.10.0.0 release and this issue should not really
> >> > happen, so your observation is a bit weird to me. Could your add a log
> >> > entry in the `configure` function which constructs the registry client
> >> to
> >> > make sure it is indeed triggered when the streams app start up?
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> >
> >> > On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff <
> walter.rak...@gmail.com
> >> >
> >> > wrote:
> >> >
> >> > > Guozhang,
> >> > >
> >> > > Any clues on this one?
> >> > >
> >> > > Walter
> >> > >
> >> > > On 

Re: Exception while deserializing in kafka streams

2016-09-26 Thread Guozhang Wang
Hi Walter,

One thing I can think of is that, if you pass the serde object as part of
your topology definition, instead of passing the serde class in the config,
then these serde objects will not be auto configured and hence for your
case the schema registry client will not be constructed and initialized.

https://issues.apache.org/jira/browse/KAFKA-3729

So in case your application's topology does overwrite serdes with direct
serde object passing, you need to configure them manually for now.


Guozhang

On Thu, Sep 22, 2016 at 5:36 PM, Guozhang Wang  wrote:

> Hi Walter,
>
> I downloaded the 0.10.0 jar and verified that the configure() function is
> auto-triggered when you get the serde classes from `context.keySerde /
> valueSerde`, which is auto-triggered if you use the DSL. And your Scala
> code is the same as to our examples code:
>
> https://github.com/confluentinc/examples/blob/
> 030343f9acbb9f73a13ab0f0fd31690dca97c606/kafka-streams/src/
> main/java/io/confluent/examples/streams/utils/GenericAvroSerializer.java
>
>
> Which demo example were you running? And are there any other jars
> co-located with the 0.10.0.0 jar that could cause another class be loaded?
>
>
> Guozhang
>
>
> On Thu, Sep 22, 2016 at 8:41 AM, Walter rakoff 
> wrote:
>
>> Guozhang,
>>
>> I tried your suggestion. Below is the log from Serde, Serializer
>> & Deserializer.
>> Confirmed that KafkaAvroDeserializer.configure does get invoked.
>>
>> Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In
>> > configure {num.standby.replicas=1, replication.factor=3,
>> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
>> > schema.registry.url=http://10.200.184.41:8081,
>> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
>> > zookeeper.connect=10.200.184.26:2181, value.serde=class
>> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
>> > auto.offset.reset=earliest, num.stream.threads=1, client.id
>> =KStreams-Test,
>> > application.id=testing-app}
>> > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchem
>> aRegistry:
>> > In configure{num.standby.replicas=1, replication.factor=3,
>> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
>> > schema.registry.url=http://10.200.184.41:8081,
>> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
>> > zookeeper.connect=10.200.184.26:2181, value.serde=class
>> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
>> > auto.offset.reset=earliest, num.stream.threads=1, client.id
>> =KStreams-Test,
>> > application.id=testing-app}
>> > Line 385: 16/09/22 15:28:46 WARN
>> > GenericAvroDeserializerWithSchemaRegistry: In
>> > configure{num.standby.replicas=1, replication.factor=3,
>> commit.interval.ms=125000,
>> > bootstrap.servers=10.200.184.29:9092, schema.registry.url=
>> > http://10.200.184.41:8081,
>> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
>> > zookeeper.connect=10.200.184.26:2181, value.serde=class
>> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
>> > auto.offset.reset=earliest, num.stream.threads=1, client.id
>> =KStreams-Test,
>> > application.id=testing-app}
>>
>>
>> Still the same exception
>>
>> 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete
>> > [StreamThread-1]
>> > Exception in thread "StreamThread-1"
>> > org.apache.kafka.common.errors.SerializationException: Error
>> deserializing
>> > Avro message for id 7
>> > Caused by: java.lang.NullPointerException
>> > at
>> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
>> .deserialize(AbstractKafkaAvroDeserializer.java:120)
>> > at
>> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
>> .deserialize(AbstractKafkaAvroDeserializer.java:92)
>>
>>
>>
>> Walter
>>
>> On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang 
>> wrote:
>>
>> > Hello Walter,
>> >
>> > The WARN log entry should not be the cause of this issue.
>> >
>> > I double checked the 0.10.0.0 release and this issue should not really
>> > happen, so your observation is a bit weird to me. Could your add a log
>> > entry in the `configure` function which constructs the registry client
>> to
>> > make sure it is indeed triggered when the streams app start up?
>> >
>> >
>> > Guozhang
>> >
>> >
>> >
>> > On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff > >
>> > wrote:
>> >
>> > > Guozhang,
>> > >
>> > > Any clues on this one?
>> > >
>> > > Walter
>> > >
>> > > On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff <
>> walter.rak...@gmail.com>
>> > > wrote:
>> > >
>> > > > Guozhang,
>> > > >
>> > > > I am using 0.10.0.0. Could the below log be the cause?
>> > > >
>> > > > 16/09/14 17:24:35 WARN ConsumerConfig: The configuration
>> > > > schema.registry.url = http://192.168.50.6: <
>> http://192.168.50.6:8081/>
>> > > 8081
>> > > > was supplied but isn't a known config.
>> > > > 16/09/14 17:24:35 INFO AppInfoParser: 

Re: Exception while deserializing in kafka streams

2016-09-22 Thread Guozhang Wang
Hi Walter,

I downloaded the 0.10.0 jar and verified that the configure() function is
auto-triggered when you get the serde classes from `context.keySerde /
valueSerde`, which is auto-triggered if you use the DSL. And your Scala
code is the same as to our examples code:

https://github.com/confluentinc/examples/blob/030343f9acbb9f73a13ab0f0fd31690dca97c606/kafka-streams/src/main/java/io/confluent/examples/streams/utils/GenericAvroSerializer.java


Which demo example were you running? And are there any other jars
co-located with the 0.10.0.0 jar that could cause another class be loaded?


Guozhang


On Thu, Sep 22, 2016 at 8:41 AM, Walter rakoff 
wrote:

> Guozhang,
>
> I tried your suggestion. Below is the log from Serde, Serializer
> & Deserializer.
> Confirmed that KafkaAvroDeserializer.configure does get invoked.
>
> Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In
> > configure {num.standby.replicas=1, replication.factor=3,
> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> > schema.registry.url=http://10.200.184.41:8081,
> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> =KStreams-Test,
> > application.id=testing-app}
> > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchem
> aRegistry:
> > In configure{num.standby.replicas=1, replication.factor=3,
> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> > schema.registry.url=http://10.200.184.41:8081,
> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> =KStreams-Test,
> > application.id=testing-app}
> > Line 385: 16/09/22 15:28:46 WARN
> > GenericAvroDeserializerWithSchemaRegistry: In
> > configure{num.standby.replicas=1, replication.factor=3,
> commit.interval.ms=125000,
> > bootstrap.servers=10.200.184.29:9092, schema.registry.url=
> > http://10.200.184.41:8081,
> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> > zookeeper.connect=10.200.184.26:2181, value.serde=class
> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> > auto.offset.reset=earliest, num.stream.threads=1, client.id
> =KStreams-Test,
> > application.id=testing-app}
>
>
> Still the same exception
>
> 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete
> > [StreamThread-1]
> > Exception in thread "StreamThread-1"
> > org.apache.kafka.common.errors.SerializationException: Error
> deserializing
> > Avro message for id 7
> > Caused by: java.lang.NullPointerException
> > at
> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> deserialize(AbstractKafkaAvroDeserializer.java:120)
> > at
> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> deserialize(AbstractKafkaAvroDeserializer.java:92)
>
>
>
> Walter
>
> On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang  wrote:
>
> > Hello Walter,
> >
> > The WARN log entry should not be the cause of this issue.
> >
> > I double checked the 0.10.0.0 release and this issue should not really
> > happen, so your observation is a bit weird to me. Could your add a log
> > entry in the `configure` function which constructs the registry client to
> > make sure it is indeed triggered when the streams app start up?
> >
> >
> > Guozhang
> >
> >
> >
> > On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff 
> > wrote:
> >
> > > Guozhang,
> > >
> > > Any clues on this one?
> > >
> > > Walter
> > >
> > > On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff <
> walter.rak...@gmail.com>
> > > wrote:
> > >
> > > > Guozhang,
> > > >
> > > > I am using 0.10.0.0. Could the below log be the cause?
> > > >
> > > > 16/09/14 17:24:35 WARN ConsumerConfig: The configuration
> > > > schema.registry.url = http://192.168.50.6: <
> http://192.168.50.6:8081/>
> > > 8081
> > > > was supplied but isn't a known config.
> > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0
> > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId :
> b8642491e78c5a13
> > > >
> > > > Walter
> > > >
> > > >
> > > > On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > >> Hello Walter,
> > > >>
> > > >> Which version of Kafka were you using?
> > > >>
> > > >> I ask this because there was a bug causing the serde passed through
> > > config
> > > >> to NOT being configured when constructed:
> > > >> https://issues.apache.org/jira/browse/KAFKA-3639
> > > >>
> > > >>
> > > >> Which is fixed in the 0.10.0.0 release, which means you will only
> hit
> > it
> > > >> if
> > > >> you are using the tech-preview release version.
> > > >>
> > > >>
> > > >> 

Re: Exception while deserializing in kafka streams

2016-09-22 Thread Walter rakoff
Guozhang,

I tried your suggestion. Below is the log from Serde, Serializer
& Deserializer.
Confirmed that KafkaAvroDeserializer.configure does get invoked.

Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In
> configure {num.standby.replicas=1, replication.factor=3,
> commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> schema.registry.url=http://10.200.184.41:8081,
> key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> zookeeper.connect=10.200.184.26:2181, value.serde=class
> kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> auto.offset.reset=earliest, num.stream.threads=1, client.id=KStreams-Test,
> application.id=testing-app}
> Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchemaRegistry:
> In configure{num.standby.replicas=1, replication.factor=3,
> commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
> schema.registry.url=http://10.200.184.41:8081,
> key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> zookeeper.connect=10.200.184.26:2181, value.serde=class
> kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> auto.offset.reset=earliest, num.stream.threads=1, client.id=KStreams-Test,
> application.id=testing-app}
> Line 385: 16/09/22 15:28:46 WARN
> GenericAvroDeserializerWithSchemaRegistry: In
> configure{num.standby.replicas=1, replication.factor=3, 
> commit.interval.ms=125000,
> bootstrap.servers=10.200.184.29:9092, schema.registry.url=
> http://10.200.184.41:8081,
> key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
> zookeeper.connect=10.200.184.26:2181, value.serde=class
> kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
> auto.offset.reset=earliest, num.stream.threads=1, client.id=KStreams-Test,
> application.id=testing-app}


Still the same exception

16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete
> [StreamThread-1]
> Exception in thread "StreamThread-1"
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> Avro message for id 7
> Caused by: java.lang.NullPointerException
> at
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:120)
> at
> io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)



Walter

On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang  wrote:

> Hello Walter,
>
> The WARN log entry should not be the cause of this issue.
>
> I double checked the 0.10.0.0 release and this issue should not really
> happen, so your observation is a bit weird to me. Could your add a log
> entry in the `configure` function which constructs the registry client to
> make sure it is indeed triggered when the streams app start up?
>
>
> Guozhang
>
>
>
> On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff 
> wrote:
>
> > Guozhang,
> >
> > Any clues on this one?
> >
> > Walter
> >
> > On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff 
> > wrote:
> >
> > > Guozhang,
> > >
> > > I am using 0.10.0.0. Could the below log be the cause?
> > >
> > > 16/09/14 17:24:35 WARN ConsumerConfig: The configuration
> > > schema.registry.url = http://192.168.50.6: 
> > 8081
> > > was supplied but isn't a known config.
> > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0
> > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
> > >
> > > Walter
> > >
> > >
> > > On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang 
> > wrote:
> > >
> > >> Hello Walter,
> > >>
> > >> Which version of Kafka were you using?
> > >>
> > >> I ask this because there was a bug causing the serde passed through
> > config
> > >> to NOT being configured when constructed:
> > >> https://issues.apache.org/jira/browse/KAFKA-3639
> > >>
> > >>
> > >> Which is fixed in the 0.10.0.0 release, which means you will only hit
> it
> > >> if
> > >> you are using the tech-preview release version.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff <
> > walter.rak...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hello,
> > >> >
> > >> > I get the below exception when deserilaizing avro records
> > >> > using KafkaAvroDeserializer.
> > >> >
> > >> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete
> > >> > [StreamThread-1]
> > >> > Exception in thread "StreamThread-1"
> > >> > org.apache.kafka.common.errors.SerializationException:
> > >> > Error deserializing Avro message for id 4
> > >> > Caused by: java.lang.NullPointerException
> > >> > at io.confluent.kafka.serializers.
> > AbstractKafkaAvroDeserializer
> > >> .
> > >> > deserialize(AbstractKafkaAvroDeserializer.java:120)
> > >> > at io.confluent.kafka.serializers.
> > AbstractKafkaAvroDeserializer
> > >> .
> > >> > deserialize(AbstractKafkaAvroDeserializer.java:92)
> > >> >
> > >> > I can confirm that schema 

Re: Exception while deserializing in kafka streams

2016-09-19 Thread Guozhang Wang
Hello Walter,

The WARN log entry should not be the cause of this issue.

I double checked the 0.10.0.0 release and this issue should not really
happen, so your observation is a bit weird to me. Could your add a log
entry in the `configure` function which constructs the registry client to
make sure it is indeed triggered when the streams app start up?


Guozhang



On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff 
wrote:

> Guozhang,
>
> Any clues on this one?
>
> Walter
>
> On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff 
> wrote:
>
> > Guozhang,
> >
> > I am using 0.10.0.0. Could the below log be the cause?
> >
> > 16/09/14 17:24:35 WARN ConsumerConfig: The configuration
> > schema.registry.url = http://192.168.50.6: 
> 8081
> > was supplied but isn't a known config.
> > 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0
> > 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
> >
> > Walter
> >
> >
> > On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang 
> wrote:
> >
> >> Hello Walter,
> >>
> >> Which version of Kafka were you using?
> >>
> >> I ask this because there was a bug causing the serde passed through
> config
> >> to NOT being configured when constructed:
> >> https://issues.apache.org/jira/browse/KAFKA-3639
> >>
> >>
> >> Which is fixed in the 0.10.0.0 release, which means you will only hit it
> >> if
> >> you are using the tech-preview release version.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff <
> walter.rak...@gmail.com>
> >> wrote:
> >>
> >> > Hello,
> >> >
> >> > I get the below exception when deserilaizing avro records
> >> > using KafkaAvroDeserializer.
> >> >
> >> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete
> >> > [StreamThread-1]
> >> > Exception in thread "StreamThread-1"
> >> > org.apache.kafka.common.errors.SerializationException:
> >> > Error deserializing Avro message for id 4
> >> > Caused by: java.lang.NullPointerException
> >> > at io.confluent.kafka.serializers.
> AbstractKafkaAvroDeserializer
> >> .
> >> > deserialize(AbstractKafkaAvroDeserializer.java:120)
> >> > at io.confluent.kafka.serializers.
> AbstractKafkaAvroDeserializer
> >> .
> >> > deserialize(AbstractKafkaAvroDeserializer.java:92)
> >> >
> >> > I can confirm that schema registry URL is accessible and
> >> url/schemas/ids/4
> >> > does return valid schema.
> >> > May be some initialization didn't happen correctly?
> >> >
> >> > props.put(AbstractKafkaAvroSerDeConfig.
> SCHEMA_REGISTRY_URL_CONFIG,
> >> "
> >> > 192.168.50.6:8081")
> >> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >> > Serdes.Long.getClass.getName)
> >> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
> >> > GenericAvroSerdeWithSchemaRegistry])
> >> >
> >> > GenericAvroSerdeWithSchemaRegistry code -->
> https://www.dropbox.com/s/
> >> > y471k9nj94tlxro/avro_serde.txt?dl=0
> >> >
> >> > Walter
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang


Re: Exception while deserializing in kafka streams

2016-09-16 Thread Walter rakoff
Guozhang,

Any clues on this one?

Walter

On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff 
wrote:

> Guozhang,
>
> I am using 0.10.0.0. Could the below log be the cause?
>
> 16/09/14 17:24:35 WARN ConsumerConfig: The configuration
> schema.registry.url = http://192.168.50.6: 8081
> was supplied but isn't a known config.
> 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0
> 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13
>
> Walter
>
>
> On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang  wrote:
>
>> Hello Walter,
>>
>> Which version of Kafka were you using?
>>
>> I ask this because there was a bug causing the serde passed through config
>> to NOT being configured when constructed:
>> https://issues.apache.org/jira/browse/KAFKA-3639
>>
>>
>> Which is fixed in the 0.10.0.0 release, which means you will only hit it
>> if
>> you are using the tech-preview release version.
>>
>>
>> Guozhang
>>
>>
>> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff 
>> wrote:
>>
>> > Hello,
>> >
>> > I get the below exception when deserilaizing avro records
>> > using KafkaAvroDeserializer.
>> >
>> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete
>> > [StreamThread-1]
>> > Exception in thread "StreamThread-1"
>> > org.apache.kafka.common.errors.SerializationException:
>> > Error deserializing Avro message for id 4
>> > Caused by: java.lang.NullPointerException
>> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
>> .
>> > deserialize(AbstractKafkaAvroDeserializer.java:120)
>> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
>> .
>> > deserialize(AbstractKafkaAvroDeserializer.java:92)
>> >
>> > I can confirm that schema registry URL is accessible and
>> url/schemas/ids/4
>> > does return valid schema.
>> > May be some initialization didn't happen correctly?
>> >
>> > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> "
>> > 192.168.50.6:8081")
>> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > Serdes.Long.getClass.getName)
>> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
>> > GenericAvroSerdeWithSchemaRegistry])
>> >
>> > GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/
>> > y471k9nj94tlxro/avro_serde.txt?dl=0
>> >
>> > Walter
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


Re: Exception while deserializing in kafka streams

2016-09-14 Thread Walter rakoff
Guozhang,

I am using 0.10.0.0. Could the below log be the cause?

16/09/14 17:24:35 WARN ConsumerConfig: The configuration
schema.registry.url = http://192.168.50.6: 8081
was supplied but isn't a known config.
16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0
16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId : b8642491e78c5a13

Walter


On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang  wrote:

> Hello Walter,
>
> Which version of Kafka were you using?
>
> I ask this because there was a bug causing the serde passed through config
> to NOT being configured when constructed:
> https://issues.apache.org/jira/browse/KAFKA-3639
>
>
> Which is fixed in the 0.10.0.0 release, which means you will only hit it if
> you are using the tech-preview release version.
>
>
> Guozhang
>
>
> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff 
> wrote:
>
> > Hello,
> >
> > I get the below exception when deserilaizing avro records
> > using KafkaAvroDeserializer.
> >
> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete
> > [StreamThread-1]
> > Exception in thread "StreamThread-1"
> > org.apache.kafka.common.errors.SerializationException:
> > Error deserializing Avro message for id 4
> > Caused by: java.lang.NullPointerException
> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> > deserialize(AbstractKafkaAvroDeserializer.java:120)
> > at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> > deserialize(AbstractKafkaAvroDeserializer.java:92)
> >
> > I can confirm that schema registry URL is accessible and
> url/schemas/ids/4
> > does return valid schema.
> > May be some initialization didn't happen correctly?
> >
> > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "
> > 192.168.50.6:8081")
> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.Long.getClass.getName)
> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
> > GenericAvroSerdeWithSchemaRegistry])
> >
> > GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/
> > y471k9nj94tlxro/avro_serde.txt?dl=0
> >
> > Walter
> >
>
>
>
> --
> -- Guozhang
>


Re: Exception while deserializing in kafka streams

2016-09-14 Thread Guozhang Wang
Hello Walter,

Which version of Kafka were you using?

I ask this because there was a bug causing the serde passed through config
to NOT being configured when constructed:
https://issues.apache.org/jira/browse/KAFKA-3639


Which is fixed in the 0.10.0.0 release, which means you will only hit it if
you are using the tech-preview release version.


Guozhang


On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff 
wrote:

> Hello,
>
> I get the below exception when deserilaizing avro records
> using KafkaAvroDeserializer.
>
> 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete
> [StreamThread-1]
> Exception in thread "StreamThread-1"
> org.apache.kafka.common.errors.SerializationException:
> Error deserializing Avro message for id 4
> Caused by: java.lang.NullPointerException
> at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> deserialize(AbstractKafkaAvroDeserializer.java:120)
> at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
> deserialize(AbstractKafkaAvroDeserializer.java:92)
>
> I can confirm that schema registry URL is accessible and url/schemas/ids/4
> does return valid schema.
> May be some initialization didn't happen correctly?
>
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "
> 192.168.50.6:8081")
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.Long.getClass.getName)
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
> GenericAvroSerdeWithSchemaRegistry])
>
> GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/
> y471k9nj94tlxro/avro_serde.txt?dl=0
>
> Walter
>



-- 
-- Guozhang


Exception while deserializing in kafka streams

2016-09-14 Thread Walter rakoff
Hello,

I get the below exception when deserilaizing avro records
using KafkaAvroDeserializer.

16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown complete
[StreamThread-1]
Exception in thread "StreamThread-1"
org.apache.kafka.common.errors.SerializationException:
Error deserializing Avro message for id 4
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
deserialize(AbstractKafkaAvroDeserializer.java:120)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.
deserialize(AbstractKafkaAvroDeserializer.java:92)

I can confirm that schema registry URL is accessible and url/schemas/ids/4
does return valid schema.
May be some initialization didn't happen correctly?

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "
192.168.50.6:8081")
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.Long.getClass.getName)
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
GenericAvroSerdeWithSchemaRegistry])

GenericAvroSerdeWithSchemaRegistry code --> https://www.dropbox.com/s/
y471k9nj94tlxro/avro_serde.txt?dl=0

Walter