You set:

> senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass());

This will tell the Producer to create a new AvroSerailizer object, and
this object expects "schema.registry.url" to be set during
initialization, ie, you need to add the config to `senderProps`.

However, this will overall not give you want to what IMHO, as the
Producer won't used MockSchemaRegistry for this case.

You would need to use

> avroSerializer = new KafkaAvroSerializer(schemaRegistry, new 
> HashMap(defaultConfig));

outside of the producer in your tests (ie, serialize the data
"manually"), and pass `byte[]/byte[]` type into the producer itself.


-Matthias

On 10/29/18 2:56 PM, chinchu chinchu wrote:
> Hey folks,
> I am  getting the below exception when using a mockSchemaRegsitry in a
> junit test . I appreciate your help.I am  using confluent 4.0
> 
> 
>   private  SchemaRegistryClient schemaRegistry;
>   private  KafkaAvroSerializer avroSerializer;
> 
> Properties defaultConfig = new Properties();
> defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "
> http://fake-url";);
> defaultConfig.put("auto.register.schemas", false);
>     schemaRegistry = new MockSchemaRegistryClient();
>     avroSerializer = new KafkaAvroSerializer(schemaRegistry, new
> HashMap(defaultConfig));
>  Map<String, Object> senderProps  =
> KafkaTestUtils.producerProps(embeddedKafka);
> senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class);
> senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass());
> KafkaProducer<String, Log> producer = new
> KafkaProducer<String,Log>(senderProps);
> ProducerRecord<String,Log> record =new
> ProducerRecord<String,Log>("test2","1",log);
> try {
> producer.send(record).get();
> } catch (InterruptedException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> 
> Error
> ==============
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:441)
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:268)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
> at
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206)
> Caused by: io.confluent.common.config.ConfigException: Missing required
> configuration "schema.registry.url" which has no default value.
> at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
> at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
> at
> io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:61)
> at
> io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
> at
> io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
> at
> org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60)
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:360)
> ... 28 more
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to