You set: > senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass());
This will tell the Producer to create a new AvroSerailizer object, and this object expects "schema.registry.url" to be set during initialization, ie, you need to add the config to `senderProps`. However, this will overall not give you want to what IMHO, as the Producer won't used MockSchemaRegistry for this case. You would need to use > avroSerializer = new KafkaAvroSerializer(schemaRegistry, new > HashMap(defaultConfig)); outside of the producer in your tests (ie, serialize the data "manually"), and pass `byte[]/byte[]` type into the producer itself. -Matthias On 10/29/18 2:56 PM, chinchu chinchu wrote: > Hey folks, > I am getting the below exception when using a mockSchemaRegsitry in a > junit test . I appreciate your help.I am using confluent 4.0 > > > private SchemaRegistryClient schemaRegistry; > private KafkaAvroSerializer avroSerializer; > > Properties defaultConfig = new Properties(); > defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, " > http://fake-url"); > defaultConfig.put("auto.register.schemas", false); > schemaRegistry = new MockSchemaRegistryClient(); > avroSerializer = new KafkaAvroSerializer(schemaRegistry, new > HashMap(defaultConfig)); > Map<String, Object> senderProps = > KafkaTestUtils.producerProps(embeddedKafka); > senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,avroSerializer.getClass()); > KafkaProducer<String, Log> producer = new > KafkaProducer<String,Log>(senderProps); > ProducerRecord<String,Log> record =new > ProducerRecord<String,Log>("test2","1",log); > try { > producer.send(record).get(); > } catch (InterruptedException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > > Error > ============== > org.apache.kafka.common.KafkaException: Failed to construct kafka producer > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:441) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:268) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) > at > org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460) > at > org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206) > Caused by: io.confluent.common.config.ConfigException: Missing required > configuration "schema.registry.url" which has no default value. > at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243) > at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78) > at > io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:61) > at > io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32) > at > io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48) > at > org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:360) > ... 28 more >
signature.asc
Description: OpenPGP digital signature