I added the below line as said…  I cannot see any exceptions also!!!!

Still nothing is happening ☹

I am not sure, why these below as always showing as null… even though I set 
them in my Application.java class!! Any help on how to set these properties ???

ssl.keystore.location = null
ssl.truststore.location = null
ssl.keystore.password = null


Thanks a lot in advance.

Regards,
Raja.

From: Chaitanya Chebolu <chaita...@datatorrent.com>
Reply-To: "users@apex.apache.org" <users@apex.apache.org>
Date: Tuesday, December 6, 2016 at 5:17 PM
To: "users@apex.apache.org" <users@apex.apache.org>
Subject: Re: [EXTERNAL] Re: KafkaSinglePortInputOperator

Raja,

   Please set the consumerProps to the KafkaSinglePortInputOperator.
   Add the below line in your application:
  KafkaSinglePortInputOperator in = dag.addOperator("in", new 
KafkaSinglePortInputOperator());
  ------
   in.setConsumerProps(props);

 Please let me know, if you are still facing issues.

Regards,
Chaitanya



On Tue, Dec 6, 2016 at 5:00 PM, Raja.Aravapalli 
<raja.aravapa...@target.com<mailto:raja.aravapa...@target.com>> wrote:

Find below the log I am observing:

2016-12-06 05:17:37,264 INFO  kafka.AbstractKafkaInputOperator 
(AbstractKafkaInputOperator.java:initPartitioner(311)) - Initialize Partitioner
2016-12-06 05:17:37,265 INFO  kafka.AbstractKafkaInputOperator 
(AbstractKafkaInputOperator.java:initPartitioner(324)) - Actual Partitioner is 
class org.apache.apex.malhar.kafka.OneToOnePartitioner
2016-12-06 05:17:37,280 INFO  consumer.ConsumerConfig 
(AbstractConfig.java:logAll(165)) - ConsumerConfig values:
                metric.reporters = []
                metadata.max.age.ms<http://metadata.max.age.ms> = 300000
                value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
                group.id<http://group.id> = 
org.apache.apex.malhar.kafka.AbstractKafkaInputOperatorMETA_GROUP
                partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
                reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
                sasl.kerberos.ticket.renew.window.factor = 0.8
                max.partition.fetch.bytes = 1048576
                bootstrap.servers = 
[10.66.137.116:9093<http://10.66.137.116:9093>]
                retry.backoff.ms<http://retry.backoff.ms> = 100
                sasl.kerberos.kinit.cmd = /usr/bin/kinit
                sasl.kerberos.service.name<http://sasl.kerberos.service.name> = 
null
                sasl.kerberos.ticket.renew.jitter = 0.05
                ssl.keystore.type = JKS
                ssl.trustmanager.algorithm = PKIX
                enable.auto.commit = false
                ssl.key.password = null
                fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
                sasl.kerberos.min.time.before.relogin = 60000
                connections.max.idle.ms<http://connections.max.idle.ms> = 540000
                ssl.truststore.password = null
                session.timeout.ms<http://session.timeout.ms> = 30000
                metrics.num.samples = 2
                client.id<http://client.id> =
                ssl.endpoint.identification.algorithm = null
                key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
                ssl.protocol = TLS
                check.crcs = true
                request.timeout.ms<http://request.timeout.ms> = 40000
                ssl.provider = null
                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
                ssl.keystore.location = null
                heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
                auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
                receive.buffer.bytes = 32768
                ssl.cipher.suites = null
                ssl.truststore.type = JKS
                security.protocol = PLAINTEXT
                ssl.truststore.location = null
                ssl.keystore.password = null
                ssl.keymanager.algorithm = SunX509
                metrics.sample.window.ms<http://metrics.sample.window.ms> = 
30000
                fetch.min.bytes = 1024
                send.buffer.bytes = 131072
                auto.offset.reset = latest

2016-12-06 05:17:37,385 INFO  utils.AppInfoParser 
(AppInfoParser.java:<init>(82)) - Kafka version : 0.9.0.0
2016-12-06 05:17:37,385 INFO  utils.AppInfoParser 
(AppInfoParser.java:<init>(83)) - Kafka commitId : fc7243c2af4b2b4a



Regards,
Raja.

From: Chaitanya Chebolu 
<chaita...@datatorrent.com<mailto:chaita...@datatorrent.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" 
<users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Tuesday, December 6, 2016 at 4:28 PM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" 
<users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: [EXTERNAL] Re: KafkaSinglePortInputOperator

Hi Raja,

  Could you please share the Application Master logs and Kafka operator 
container logs.

Regards,
Chaitanya

On Tue, Dec 6, 2016 at 4:17 PM, Raja.Aravapalli 
<raja.aravapa...@target.com<mailto:raja.aravapa...@target.com>> wrote:

Hi Team,

I am using “KafkaSinglePortInputOperator” to connect to a SSL Secured topic in 
Kafka 0.9!!

Unfortunately… my apex application is not going to “RUNNING” state…!! Only 
staying in ACCEPTED State and then going into FAILED statie!! I don’t see much 
information in the logs…!! ☹

Can someone please help fix the issue…. We have immediate need to read messages 
from kafka 0.9 SSL configured topics…

Please advise!


Thanks very much in advance.


Regards,
Raja.


Reply via email to