Hi Walter,

One thing I can think of is that, if you pass the serde object as part of
your topology definition, instead of passing the serde class in the config,
then these serde objects will not be auto configured and hence for your
case the schema registry client will not be constructed and initialized.

https://issues.apache.org/jira/browse/KAFKA-3729

So in case your application's topology does overwrite serdes with direct
serde object passing, you need to configure them manually for now.


Guozhang

On Thu, Sep 22, 2016 at 5:36 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Walter,
>
> I downloaded the 0.10.0 jar and verified that the configure() function is
> auto-triggered when you get the serde classes from `context.keySerde /
> valueSerde`, which is auto-triggered if you use the DSL. And your Scala
> code is the same as to our examples code:
>
> https://github.com/confluentinc/examples/blob/
> 030343f9acbb9f73a13ab0f0fd31690dca97c606/kafka-streams/src/
> main/java/io/confluent/examples/streams/utils/GenericAvroSerializer.java
>
>
> Which demo example were you running? And are there any other jars
> co-located with the 0.10.0.0 jar that could cause another class be loaded?
>
>
> Guozhang
>
>
> On Thu, Sep 22, 2016 at 8:41 AM, Walter rakoff <walter.rak...@gmail.com>
> wrote:
>
>> Guozhang,
>>
>> I tried your suggestion. Below is the log from Serde, Serializer
>> & Deserializer.
>> Confirmed that KafkaAvroDeserializer.configure does get invoked.
>>
>> Line 379: 16/09/22 15:28:46 WARN GenericAvroSerdeWithSchemaRegistry: In
>> > configure {num.standby.replicas=1, replication.factor=3,
>> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
>> > schema.registry.url=http://10.200.184.41:8081,
>> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
>> > zookeeper.connect=10.200.184.26:2181, value.serde=class
>> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
>> > auto.offset.reset=earliest, num.stream.threads=1, client.id
>> =KStreams-Test,
>> > application.id=testing-app}
>> > Line 380: 16/09/22 15:28:46 WARN GenericAvroSerializerWithSchem
>> aRegistry:
>> > In configure{num.standby.replicas=1, replication.factor=3,
>> > commit.interval.ms=125000, bootstrap.servers=10.200.184.29:9092,
>> > schema.registry.url=http://10.200.184.41:8081,
>> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
>> > zookeeper.connect=10.200.184.26:2181, value.serde=class
>> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
>> > auto.offset.reset=earliest, num.stream.threads=1, client.id
>> =KStreams-Test,
>> > application.id=testing-app}
>> > Line 385: 16/09/22 15:28:46 WARN
>> > GenericAvroDeserializerWithSchemaRegistry: In
>> > configure{num.standby.replicas=1, replication.factor=3,
>> commit.interval.ms=125000,
>> > bootstrap.servers=10.200.184.29:9092, schema.registry.url=
>> > http://10.200.184.41:8081,
>> > key.serde=org.apache.kafka.common.serialization.Serdes$LongSerde,
>> > zookeeper.connect=10.200.184.26:2181, value.serde=class
>> > kstreams.serdes.GenericAvroSerdeWithSchemaRegistry,
>> > auto.offset.reset=earliest, num.stream.threads=1, client.id
>> =KStreams-Test,
>> > application.id=testing-app}
>>
>>
>> Still the same exception
>>
>> 16/09/22 15:28:47 INFO StreamThread: Stream thread shutdown complete
>> > [StreamThread-1]
>> > Exception in thread "StreamThread-1"
>> > org.apache.kafka.common.errors.SerializationException: Error
>> deserializing
>> > Avro message for id 7
>> > Caused by: java.lang.NullPointerException
>> >         at
>> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
>> .deserialize(AbstractKafkaAvroDeserializer.java:120)
>> >         at
>> > io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
>> .deserialize(AbstractKafkaAvroDeserializer.java:92)
>>
>>
>>
>> Walter
>>
>> On Mon, Sep 19, 2016 at 8:25 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>
>> > Hello Walter,
>> >
>> > The WARN log entry should not be the cause of this issue.
>> >
>> > I double checked the 0.10.0.0 release and this issue should not really
>> > happen, so your observation is a bit weird to me. Could your add a log
>> > entry in the `configure` function which constructs the registry client
>> to
>> > make sure it is indeed triggered when the streams app start up?
>> >
>> >
>> > Guozhang
>> >
>> >
>> >
>> > On Fri, Sep 16, 2016 at 2:27 PM, Walter rakoff <walter.rak...@gmail.com
>> >
>> > wrote:
>> >
>> > > Guozhang,
>> > >
>> > > Any clues on this one?
>> > >
>> > > Walter
>> > >
>> > > On Wed, Sep 14, 2016 at 9:46 PM, Walter rakoff <
>> walter.rak...@gmail.com>
>> > > wrote:
>> > >
>> > > > Guozhang,
>> > > >
>> > > > I am using 0.10.0.0. Could the below log be the cause?
>> > > >
>> > > > 16/09/14 17:24:35 WARN ConsumerConfig: The configuration
>> > > > schema.registry.url = http://192.168.50.6: <
>> http://192.168.50.6:8081/>
>> > > 8081
>> > > > was supplied but isn't a known config.
>> > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka version : 0.10.0.0
>> > > > 16/09/14 17:24:35 INFO AppInfoParser: Kafka commitId :
>> b8642491e78c5a13
>> > > >
>> > > > Walter
>> > > >
>> > > >
>> > > > On Wed, Sep 14, 2016 at 8:11 PM, Guozhang Wang <wangg...@gmail.com>
>> > > wrote:
>> > > >
>> > > >> Hello Walter,
>> > > >>
>> > > >> Which version of Kafka were you using?
>> > > >>
>> > > >> I ask this because there was a bug causing the serde passed through
>> > > config
>> > > >> to NOT being configured when constructed:
>> > > >> https://issues.apache.org/jira/browse/KAFKA-3639
>> > > >>
>> > > >>
>> > > >> Which is fixed in the 0.10.0.0 release, which means you will only
>> hit
>> > it
>> > > >> if
>> > > >> you are using the tech-preview release version.
>> > > >>
>> > > >>
>> > > >> Guozhang
>> > > >>
>> > > >>
>> > > >> On Wed, Sep 14, 2016 at 11:10 AM, Walter rakoff <
>> > > walter.rak...@gmail.com>
>> > > >> wrote:
>> > > >>
>> > > >> > Hello,
>> > > >> >
>> > > >> > I get the below exception when deserilaizing avro records
>> > > >> > using KafkaAvroDeserializer.
>> > > >> >
>> > > >> > 16/09/14 17:24:39 INFO StreamThread: Stream thread shutdown
>> complete
>> > > >> > [StreamThread-1]
>> > > >> > Exception in thread "StreamThread-1"
>> > > >> > org.apache.kafka.common.errors.SerializationException:
>> > > >> > Error deserializing Avro message for id 4
>> > > >> > Caused by: java.lang.NullPointerException
>> > > >> >         at io.confluent.kafka.serializers.
>> > > AbstractKafkaAvroDeserializer
>> > > >> .
>> > > >> > deserialize(AbstractKafkaAvroDeserializer.java:120)
>> > > >> >         at io.confluent.kafka.serializers.
>> > > AbstractKafkaAvroDeserializer
>> > > >> .
>> > > >> > deserialize(AbstractKafkaAvroDeserializer.java:92)
>> > > >> >
>> > > >> > I can confirm that schema registry URL is accessible and
>> > > >> url/schemas/ids/4
>> > > >> > does return valid schema.
>> > > >> > May be some initialization didn't happen correctly?
>> > > >> >
>> > > >> >     props.put(AbstractKafkaAvroSerDeConfig.
>> > > SCHEMA_REGISTRY_URL_CONFIG,
>> > > >> "
>> > > >> > 192.168.50.6:8081")
>> > > >> >     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > > >> > Serdes.Long.getClass.getName)
>> > > >> >     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[
>> > > >> > GenericAvroSerdeWithSchemaRegistry])
>> > > >> >
>> > > >> > GenericAvroSerdeWithSchemaRegistry code -->
>> > > https://www.dropbox.com/s/
>> > > >> > y471k9nj94tlxro/avro_serde.txt?dl=0
>> > > >> >
>> > > >> > Walter
>> > > >> >
>> > > >>
>> > > >>
>> > > >>
>> > > >> --
>> > > >> -- Guozhang
>> > > >>
>> > > >
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to