Clint, Your code looks fine and the output doesn't actually have any errors, but you're also not waiting for the messages to be published. Try changing
producer.send(data); to producer.send(data).get(); to wait block until the message has been acked. If it runs and exits cleanly, then you should be able to see it using a consumer, e.g kafka-avro-console-consumer. The warning that you're seeing is due to the KafkaProducer's configuration class not using the schema.registry.url setting; the same settings are passed on to the serializers which do use it. It incorrectly reports the value as null due to a bug, I filed https://issues.apache.org/jira/browse/KAFKA-2026 to address that. By the way, for Confluent stuff that's not part of the Apache Kafka repository, you might want to ask questions on this list instead: https://groups.google.com/forum/#!forum/confluent-platform On Tue, Mar 17, 2015 at 8:04 AM, Clint Mcneil <clintmcn...@gmail.com> wrote: > Hi > > I can't get the Kafka/Avro serializer producer example to work. > > import org.apache.avro.Schema; > import org.apache.avro.generic.GenericData; > import org.apache.avro.generic.GenericRecord; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.clients.producer.ProducerRecord; > > import java.util.Properties; > > /** > * Created by clint on 3/17/15. > */ > public class Confluent { > > public static void main (String[] args){ > > KafkaProducer<Object, Object> producer; > Properties propsKafka = new Properties(); > > propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > io.confluent.kafka.serializers.KafkaAvroSerializer.class); > propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > io.confluent.kafka.serializers.KafkaAvroSerializer.class); > propsKafka.put("schema.registry.url", "http://localhost:8081"); > producer = new KafkaProducer<Object, Object>(propsKafka); > > String key = "key1"; > String userSchema = "{\"type\":\"record\"," + > "\"name\":\"myrecord\"," + > "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"; > > Schema.Parser parser = new Schema.Parser(); > Schema schema = parser.parse(userSchema); > > GenericRecord avroRecord = new GenericData.Record(schema); > avroRecord.put("f1", "value4"); > > ProducerRecord<Object, Object> data = new ProducerRecord<Object, > Object>("test", key , avroRecord); > producer.send(data); > > } > } > > > The output is: > > Mar 17, 2015 5:00:31 PM org.apache.kafka.common.config.AbstractConfig > logAll > INFO: ProducerConfig values: > compression.type = none > metric.reporters = [] > metadata.max.age.ms = 300000 > metadata.fetch.timeout.ms = 60000 > acks = 1 > batch.size = 16384 > reconnect.backoff.ms = 10 > bootstrap.servers = [localhost:9092] > receive.buffer.bytes = 32768 > retry.backoff.ms = 100 > buffer.memory = 33554432 > timeout.ms = 30000 > key.serializer = class > io.confluent.kafka.serializers.KafkaAvroSerializer > retries = 0 > max.request.size = 1048576 > block.on.buffer.full = true > value.serializer = class > io.confluent.kafka.serializers.KafkaAvroSerializer > metrics.sample.window.ms = 30000 > send.buffer.bytes = 131072 > max.in.flight.requests.per.connection = 5 > metrics.num.samples = 2 > linger.ms = 0 > client.id = > > Mar 17, 2015 5:00:32 PM org.apache.kafka.common.config.AbstractConfig > logUnused > WARNING: The configuration schema.registry.url = null was supplied but > isn't a known config. > > Please help > > Thanks > -- Thanks, Ewen