I was able to consume, using ssl and the schema registry. I however did not get the messages which where produced before I started the consuming, this is because it default to 'auto.offset.reset = latest', this is a Camel default, which you can change by adding 'autoOffsetReset=earliest' to the properties when creating the endpoint. Maybe that will solve your problem?
The route: public void configure() { from("kafka:192.168.99.100:9093?topic=" + TOPIC + "&" + "groupId=testgroup&" + "valueDeserializer="+CamelKafkaAvroDeserializer.class.getName()+"&" + "keyDeserializer="+ StringDeserializer.class.getName()+"&" + "brokers=192.168.99.100:9093&" + "securityProtocol=SSL&" + "sslKeyPassword=notsecret&" + "sslKeystoreLocation=/cert/client/client.keystore.jks&" + "sslKeystorePassword=notsecret&" + "sslTruststoreLocation=/cert/client/client.truststore.jks&" + "sslTruststorePassword=notsecret") .marshal(getCastor()) .to("file:target/kafkadata?noop=true"); } And the specific Deserializer to bypass the Schema Registry Url problem: /** * needed because the schema registry url can't be passed to camel */ public class CamelKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> { private static final List<String> SCHEMA_REGISTRY_URL = Collections.singletonList("http://192.168.99.100:8081"); @Override protected void configure(KafkaAvroDeserializerConfig config) { try { this.schemaRegistry = new CachedSchemaRegistryClient(SCHEMA_REGISTRY_URL, Integer.MAX_VALUE); this.useSpecificAvroReader = true; } catch (ConfigException e) { throw new org.apache.kafka.common.config.ConfigException(e.getMessage()); } } @Override public void configure(Map<String, ?> configs, boolean isKey) { configure(null); } @Override public Object deserialize(String s, byte[] bytes) { return deserialize(bytes); } @Override public void close() { } } On Tue, Mar 29, 2016 at 5:56 PM Vanshul.Chawla <vanshul.cha...@target.com> wrote: > Sure Thanks. > > Producer works and even SSL params are working fine. > Consumer starts and doesn’t consume as such. > > Any code in documentation link will he helpful. > > -----Original Message----- > From: Gerard Klijs [mailto:gerard.kl...@dizzit.com] > Sent: Tuesday, March 29, 2016 10:38 AM > To: users@camel.apache.org; Andrea Cosentino <ancosen1...@yahoo.com> > Subject: Re: Camel 2.17 Kafka SSL Producer and Consumer > > I already posted some code used with Camel 2.17 release candidate to > produce topics, using ssl and the schema registry. We will actually try the > consumer this sprint, so I could post some example in this thread if we get > it to work. > > When producing I get the following warnings (which does not seem 'real' > warnings): > [nl.rabobank.beb.MainApp.main()] KafkaProducer WARN > block.on.buffer.full config is deprecated and will be removed soon. > Please use max.block.ms > [nl.rabobank.beb.MainApp.main()] KafkaProducer WARN > metadata.fetch.timeout.ms config is deprecated and will be removed soon. > Please use max.block.ms > [nl.rabobank.beb.MainApp.main()] KafkaProducer WARN > timeout.ms config is deprecated and will be removed soon. Please use > request.timeout.ms > [nl.rabobank.beb.MainApp.main()] ProducerConfig WARN The > configuration sasl.kerberos.ticket.renew.window.factor = 0.8 was supplied > but isn't a known config. > [nl.rabobank.beb.MainApp.main()] ProducerConfig WARN The > configuration sasl.kerberos.kinit.cmd = /usr/bin/kinit was supplied but > isn't a known config. > [nl.rabobank.beb.MainApp.main()] ProducerConfig WARN The > configuration sasl.kerberos.ticket.renew.jitter = 0.05 was supplied but > isn't a known config. > [nl.rabobank.beb.MainApp.main()] ProducerConfig WARN The > configuration max.block.ms = 60000 was supplied but isn't a known config. > [nl.rabobank.beb.MainApp.main()] ProducerConfig WARN The > configuration sasl.kerberos.min.time.before.relogin = 60000 was supplied > but isn't a known config. > [nl.rabobank.beb.MainApp.main()] ProducerConfig WARN The > configuration request.timeout.ms = 30000 was supplied but isn't a known > config. > > On Tue, Mar 29, 2016 at 2:50 PM Vanshul.Chawla <vanshul.cha...@target.com> > wrote: > > > Can I please get a spring DL sample for consumer and producer uri's > > > > Producer gives this warning but works. Consumer doesn’t work. > > > > -----Original Message----- > > From: Andrea Cosentino [mailto:ancosen1...@yahoo.com.INVALID] > > Sent: Tuesday, March 29, 2016 7:36 AM > > To: users@camel.apache.org > > Subject: Re: Camel 2.17 Kafka SSL Producer and Consumer > > > > We migrate Kafka to 0.9.x version and pass completely to the Java API. > > > > The configuration changed in this upgrade. > > -- > > Andrea Cosentino > > ---------------------------------- > > Apache Camel PMC Member > > Apache Karaf Committer > > Apache Servicemix Committer > > Email: ancosen1...@yahoo.com > > Twitter: @oscerd2 > > Github: oscerd > > > > > > > > On Tuesday, March 29, 2016 2:32 PM, Vanshul. Chawla < > > vanshul.cha...@target.com> wrote: > > Hello All, > > > > When I start my producer and consumer for a SSL enabled topic, I get > > following warning and it doesn't produce or consume anything. > > > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration ssl.truststore.password = changeit was supplied but > > isn't a known config. > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration ssl.keymanager.algorithm = SunX509 was supplied but > > isn't a known config. > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration sasl.kerberos.ticket.renew.jitter = 0.05 was supplied > > but isn't a known config. > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration sasl.kerberos.ticket.renew.window.factor = 0.8 was > > supplied but isn't a known config. > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration sasl.kerberos.kinit.cmd = /usr/bin/kinit was supplied > > but isn't a known config. > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1 was > > supplied but isn't a known config. > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration ssl.keystore.location = C:\Program > > Files\Java\jdk1.7.0_75\jre\lib\security\cacerts was supplied but isn't > > a known config. > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration ssl.truststore.location = C:\Program > > Files\Java\jdk1.7.0_75\jre\lib\security\cacerts was supplied but isn't > > a known config. > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration ssl.keystore.password = changeit was supplied but isn't > > a known config. > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration ssl.protocol = TLS was supplied but isn't a known config. > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration sasl.kerberos.min.time.before.relogin = 60000 was > > supplied but isn't a known config. > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration ssl.trustmanager.algorithm = PKIX was supplied but isn't > > a known config. > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration ssl.truststore.type = JKS was supplied but isn't a known > > config. > > [pache.camel.spring.Main.main()] ConsumerConfig WARN The > > configuration ssl.keystore.type = JKS was supplied but isn't a known > config. > > > > What can be issue here? > > > > Vanshul > > > > >