Clint,

Your code looks fine and the output doesn't actually have any errors, but
you're also not waiting for the messages to be published. Try changing

producer.send(data);

to

producer.send(data).get();

to wait block until the message has been acked. If it runs and exits
cleanly, then you should be able to see it using a consumer, e.g
kafka-avro-console-consumer.

The warning that you're seeing is due to the KafkaProducer's configuration
class not using the schema.registry.url setting; the same settings are
passed on to the serializers which do use it. It incorrectly reports the
value as null due to a bug, I filed
https://issues.apache.org/jira/browse/KAFKA-2026 to address that.

By the way, for Confluent stuff that's not part of the Apache Kafka
repository, you might want to ask questions on this list instead:
https://groups.google.com/forum/#!forum/confluent-platform


On Tue, Mar 17, 2015 at 8:04 AM, Clint Mcneil <clintmcn...@gmail.com> wrote:

> Hi
>
> I can't get the Kafka/Avro serializer producer example to work.
>
> import org.apache.avro.Schema;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> import java.util.Properties;
>
> /**
>  * Created by clint on 3/17/15.
>  */
> public class Confluent {
>
>     public static void  main (String[] args){
>
>         KafkaProducer<Object, Object> producer;
>         Properties propsKafka = new Properties();
>
>         propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>         propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> io.confluent.kafka.serializers.KafkaAvroSerializer.class);
>         propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> io.confluent.kafka.serializers.KafkaAvroSerializer.class);
>         propsKafka.put("schema.registry.url", "http://localhost:8081";);
>         producer = new KafkaProducer<Object, Object>(propsKafka);
>
>         String key = "key1";
>         String userSchema = "{\"type\":\"record\"," +
>                 "\"name\":\"myrecord\"," +
>                 "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
>
>         Schema.Parser parser = new Schema.Parser();
>         Schema schema = parser.parse(userSchema);
>
>         GenericRecord avroRecord = new GenericData.Record(schema);
>         avroRecord.put("f1", "value4");
>
>         ProducerRecord<Object, Object> data = new ProducerRecord<Object,
> Object>("test", key , avroRecord);
>         producer.send(data);
>
>     }
> }
>
>
> The output is:
>
> Mar 17, 2015 5:00:31 PM org.apache.kafka.common.config.AbstractConfig
> logAll
> INFO: ProducerConfig values:
>     compression.type = none
>     metric.reporters = []
>     metadata.max.age.ms = 300000
>     metadata.fetch.timeout.ms = 60000
>     acks = 1
>     batch.size = 16384
>     reconnect.backoff.ms = 10
>     bootstrap.servers = [localhost:9092]
>     receive.buffer.bytes = 32768
>     retry.backoff.ms = 100
>     buffer.memory = 33554432
>     timeout.ms = 30000
>     key.serializer = class
> io.confluent.kafka.serializers.KafkaAvroSerializer
>     retries = 0
>     max.request.size = 1048576
>     block.on.buffer.full = true
>     value.serializer = class
> io.confluent.kafka.serializers.KafkaAvroSerializer
>     metrics.sample.window.ms = 30000
>     send.buffer.bytes = 131072
>     max.in.flight.requests.per.connection = 5
>     metrics.num.samples = 2
>     linger.ms = 0
>     client.id =
>
> Mar 17, 2015 5:00:32 PM org.apache.kafka.common.config.AbstractConfig
> logUnused
> WARNING: The configuration schema.registry.url = null was supplied but
> isn't a known config.
>
> Please help
>
> Thanks
>



-- 
Thanks,
Ewen

Reply via email to