Raja, Issue is the SSL properties(ssl.*.*) are not reflected to Kafka consumer. Could you please share the complete project ?
Thanks, Chaitanya On Wed, Dec 7, 2016 at 7:39 AM, Raja.Aravapalli <raja.aravapa...@target.com> wrote: > Hi Chaitanya, > > > > Any other thoughts on how I can fix this ?? > > > > Are Apex doesn’t yet support SSL secured topics ? > > > > > > Thanks a lot. > > > > Regards, > > Raja. > > > > *From: *"Raja.Aravapalli" <raja.aravapa...@target.com> > *Reply-To: *"users@apex.apache.org" <users@apex.apache.org> > *Date: *Tuesday, December 6, 2016 at 5:32 PM > > *To: *"users@apex.apache.org" <users@apex.apache.org> > *Subject: *Re: [EXTERNAL] Re: KafkaSinglePortInputOperator > > > > > > > > I added the below line as said… I cannot see any exceptions also!!!! > > > > Still nothing is happening L > > > > I am not sure, why these below as always showing as null… even though I > set them in my Application.java class!! Any help on how to set these > properties ??? > > > > ssl.keystore.location = null > > ssl.truststore.location = null > > ssl.keystore.password = null > > > > > > Thanks a lot in advance. > > > > Regards, > > Raja. > > > > *From: *Chaitanya Chebolu <chaita...@datatorrent.com> > *Reply-To: *"users@apex.apache.org" <users@apex.apache.org> > *Date: *Tuesday, December 6, 2016 at 5:17 PM > *To: *"users@apex.apache.org" <users@apex.apache.org> > *Subject: *Re: [EXTERNAL] Re: KafkaSinglePortInputOperator > > > > Raja, > > > > Please set the consumerProps to the KafkaSinglePortInputOperator. > > Add the below line in your application: > > KafkaSinglePortInputOperator in = dag.addOperator("in", new > KafkaSinglePortInputOperator()); > > ------ > > in.setConsumerProps(props); > > > > Please let me know, if you are still facing issues. > > > > Regards, > > Chaitanya > > > > > > > > On Tue, Dec 6, 2016 at 5:00 PM, Raja.Aravapalli < > raja.aravapa...@target.com> wrote: > > > > Find below the log I am observing: > > > > 2016-12-06 05:17:37,264 INFO kafka.AbstractKafkaInputOperator > (AbstractKafkaInputOperator.java:initPartitioner(311)) - Initialize > Partitioner > > 2016-12-06 05:17:37,265 INFO kafka.AbstractKafkaInputOperator > (AbstractKafkaInputOperator.java:initPartitioner(324)) - Actual > Partitioner is class org.apache.apex.malhar.kafka.OneToOnePartitioner > > 2016-12-06 05:17:37,280 INFO consumer.ConsumerConfig > (AbstractConfig.java:logAll(165)) - ConsumerConfig values: > > metric.reporters = [] > > metadata.max.age.ms = 300000 > > value.deserializer = class org.apache.kafka.common. > serialization.ByteArrayDeserializer > > group.id = org.apache.apex.malhar.kafka. > AbstractKafkaInputOperatorMETA_GROUP > > partition.assignment.strategy = [org.apache.kafka.clients. > consumer.RangeAssignor] > > reconnect.backoff.ms = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > max.partition.fetch.bytes = 1048576 > > bootstrap.servers = [10.66.137.116:9093] > > retry.backoff.ms = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > sasl.kerberos.service.name = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > enable.auto.commit = false > > ssl.key.password = null > > fetch.max.wait.ms = 500 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms = 540000 > > ssl.truststore.password = null > > session.timeout.ms = 30000 > > metrics.num.samples = 2 > > client.id = > > ssl.endpoint.identification.algorithm = null > > key.deserializer = class org.apache.kafka.common. > serialization.ByteArrayDeserializer > > ssl.protocol = TLS > > check.crcs = true > > request.timeout.ms = 40000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > ssl.keystore.location = null > > heartbeat.interval.ms = 3000 > > auto.commit.interval.ms = 5000 > > receive.buffer.bytes = 32768 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms = 30000 > > fetch.min.bytes = 1024 > > send.buffer.bytes = 131072 > > auto.offset.reset = latest > > > > 2016-12-06 05:17:37,385 INFO utils.AppInfoParser > (AppInfoParser.java:<init>(82)) - Kafka version : 0.9.0.0 > > 2016-12-06 05:17:37,385 INFO utils.AppInfoParser > (AppInfoParser.java:<init>(83)) - Kafka commitId : fc7243c2af4b2b4a > > > > > > > > Regards, > > Raja. > > > > *From: *Chaitanya Chebolu <chaita...@datatorrent.com> > *Reply-To: *"users@apex.apache.org" <users@apex.apache.org> > *Date: *Tuesday, December 6, 2016 at 4:28 PM > *To: *"users@apex.apache.org" <users@apex.apache.org> > *Subject: *[EXTERNAL] Re: KafkaSinglePortInputOperator > > > > Hi Raja, > > > > Could you please share the Application Master logs and Kafka operator > container logs. > > > > Regards, > > Chaitanya > > > > On Tue, Dec 6, 2016 at 4:17 PM, Raja.Aravapalli < > raja.aravapa...@target.com> wrote: > > > > Hi Team, > > > > I am using “KafkaSinglePortInputOperator” to connect to a SSL Secured > topic in Kafka 0.9!! > > > > Unfortunately… my apex application is not going to “RUNNING” state…!! Only > staying in ACCEPTED State and then going into FAILED statie!! I don’t see > much information in the logs…!! L > > > > Can someone please help fix the issue…. We have immediate need to read > messages from kafka 0.9 SSL configured topics… > > > > Please advise! > > > > > > Thanks very much in advance. > > > > > > Regards, > > Raja. > > > > >