I was able to consume, using ssl and the schema registry. I however did not
get the messages which where produced before I started the consuming, this
is because it default to 'auto.offset.reset = latest', this is a Camel
default, which you can change by adding 'autoOffsetReset=earliest' to the
properties when creating the endpoint. Maybe that will solve your problem?

The route:

public void configure() {
    from("kafka:192.168.99.100:9093?topic=" + TOPIC + "&"
            + "groupId=testgroup&"
            + 
"valueDeserializer="+CamelKafkaAvroDeserializer.class.getName()+"&"
            + "keyDeserializer="+ StringDeserializer.class.getName()+"&"
            + "brokers=192.168.99.100:9093&"
            + "securityProtocol=SSL&"
            + "sslKeyPassword=notsecret&"
            + "sslKeystoreLocation=/cert/client/client.keystore.jks&"
            + "sslKeystorePassword=notsecret&"
            + "sslTruststoreLocation=/cert/client/client.truststore.jks&"
            + "sslTruststorePassword=notsecret")
    .marshal(getCastor())
    .to("file:target/kafkadata?noop=true");
}


And the specific Deserializer to bypass the Schema Registry Url problem:

/**
 * needed because the schema registry url can't be passed to camel
 */
public class CamelKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<Object> {

    private static final List<String> SCHEMA_REGISTRY_URL =
Collections.singletonList("http://192.168.99.100:8081";);

    @Override
    protected void configure(KafkaAvroDeserializerConfig config) {
        try {
            this.schemaRegistry = new
CachedSchemaRegistryClient(SCHEMA_REGISTRY_URL, Integer.MAX_VALUE);
            this.useSpecificAvroReader = true;
        } catch (ConfigException e) {
            throw new
org.apache.kafka.common.config.ConfigException(e.getMessage());
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        configure(null);
    }

    @Override
    public Object deserialize(String s, byte[] bytes) {
        return deserialize(bytes);
    }

    @Override
    public void close() {
    }
}


On Tue, Mar 29, 2016 at 5:56 PM Vanshul.Chawla <vanshul.cha...@target.com>
wrote:

> Sure Thanks.
>
> Producer works and even SSL params are working fine.
> Consumer starts and doesn’t consume as such.
>
> Any code in documentation link will he helpful.
>
> -----Original Message-----
> From: Gerard Klijs [mailto:gerard.kl...@dizzit.com]
> Sent: Tuesday, March 29, 2016 10:38 AM
> To: users@camel.apache.org; Andrea Cosentino <ancosen1...@yahoo.com>
> Subject: Re: Camel 2.17 Kafka SSL Producer and Consumer
>
> I already posted some code used with Camel 2.17 release candidate to
> produce topics, using ssl and the schema registry. We will actually try the
> consumer this sprint, so I could post some example in this thread if we get
> it to work.
>
> When producing I get the following warnings (which does not seem 'real'
> warnings):
> [nl.rabobank.beb.MainApp.main()] KafkaProducer                  WARN
>  block.on.buffer.full config is deprecated and will be removed soon.
> Please use max.block.ms
> [nl.rabobank.beb.MainApp.main()] KafkaProducer                  WARN
> metadata.fetch.timeout.ms config is deprecated and will be removed soon.
> Please use max.block.ms
> [nl.rabobank.beb.MainApp.main()] KafkaProducer                  WARN
> timeout.ms config is deprecated and will be removed soon. Please use
> request.timeout.ms
> [nl.rabobank.beb.MainApp.main()] ProducerConfig                 WARN  The
> configuration sasl.kerberos.ticket.renew.window.factor = 0.8 was supplied
> but isn't a known config.
> [nl.rabobank.beb.MainApp.main()] ProducerConfig                 WARN  The
> configuration sasl.kerberos.kinit.cmd = /usr/bin/kinit was supplied but
> isn't a known config.
> [nl.rabobank.beb.MainApp.main()] ProducerConfig                 WARN  The
> configuration sasl.kerberos.ticket.renew.jitter = 0.05 was supplied but
> isn't a known config.
> [nl.rabobank.beb.MainApp.main()] ProducerConfig                 WARN  The
> configuration max.block.ms = 60000 was supplied but isn't a known config.
> [nl.rabobank.beb.MainApp.main()] ProducerConfig                 WARN  The
> configuration sasl.kerberos.min.time.before.relogin = 60000 was supplied
> but isn't a known config.
> [nl.rabobank.beb.MainApp.main()] ProducerConfig                 WARN  The
> configuration request.timeout.ms = 30000 was supplied but isn't a known
> config.
>
> On Tue, Mar 29, 2016 at 2:50 PM Vanshul.Chawla <vanshul.cha...@target.com>
> wrote:
>
> > Can I please get a spring DL sample for consumer and producer uri's
> >
> > Producer gives this warning but works. Consumer doesn’t work.
> >
> > -----Original Message-----
> > From: Andrea Cosentino [mailto:ancosen1...@yahoo.com.INVALID]
> > Sent: Tuesday, March 29, 2016 7:36 AM
> > To: users@camel.apache.org
> > Subject: Re: Camel 2.17 Kafka SSL Producer and Consumer
> >
> > We migrate Kafka to 0.9.x version and pass completely to the Java API.
> >
> > The configuration changed in this upgrade.
> >  --
> > Andrea Cosentino
> > ----------------------------------
> > Apache Camel PMC Member
> > Apache Karaf Committer
> > Apache Servicemix Committer
> > Email: ancosen1...@yahoo.com
> > Twitter: @oscerd2
> > Github: oscerd
> >
> >
> >
> > On Tuesday, March 29, 2016 2:32 PM, Vanshul. Chawla <
> > vanshul.cha...@target.com> wrote:
> > Hello All,
> >
> > When I start my producer and consumer for a SSL enabled topic, I get
> > following warning and it doesn't produce or consume anything.
> >
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration ssl.truststore.password = changeit was supplied but
> > isn't a known config.
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration ssl.keymanager.algorithm = SunX509 was supplied but
> > isn't a known config.
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration sasl.kerberos.ticket.renew.jitter = 0.05 was supplied
> > but isn't a known config.
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration sasl.kerberos.ticket.renew.window.factor = 0.8 was
> > supplied but isn't a known config.
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration sasl.kerberos.kinit.cmd = /usr/bin/kinit was supplied
> > but isn't a known config.
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1 was
> > supplied but isn't a known config.
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration ssl.keystore.location = C:\Program
> > Files\Java\jdk1.7.0_75\jre\lib\security\cacerts was supplied but isn't
> > a known config.
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration ssl.truststore.location = C:\Program
> > Files\Java\jdk1.7.0_75\jre\lib\security\cacerts was supplied but isn't
> > a known config.
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration ssl.keystore.password = changeit was supplied but isn't
> > a known config.
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration ssl.protocol = TLS was supplied but isn't a known config.
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration sasl.kerberos.min.time.before.relogin = 60000 was
> > supplied but isn't a known config.
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration ssl.trustmanager.algorithm = PKIX was supplied but isn't
> > a known config.
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration ssl.truststore.type = JKS was supplied but isn't a known
> > config.
> > [pache.camel.spring.Main.main()] ConsumerConfig                 WARN  The
> > configuration ssl.keystore.type = JKS was supplied but isn't a known
> config.
> >
> > What can be issue here?
> >
> > Vanshul
> >
> >
>

Reply via email to