Thanks, I'll check it out.

On Thu, Aug 29, 2019 at 1:08 PM David Morin <morin.david....@gmail.com>
wrote:

> Vishwas,
>
> A config that works on my Kerberized cluster (Flink on Yarn).
> I hope this will help you.
>
> Flink conf:
> security.kerberos.login.use-ticket-cache: true
> security.kerberos.login.keytab: /home/myuser/myuser.keytab
> security.kerberos.login.principal: myuser@XXXX
> security.kerberos.login.contexts: Client
>
> Properties related to security passed as argument of the
> FlinkKafkaConsumerXX constructor:
> sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule
> required username=\"myuser\" password=\"XXXX\";"
> sasl.mechanism=PLAIN
> security.protocol=SASL_SSL
>
> Le jeu. 29 août 2019 à 18:20, Vishwas Siravara <vsirav...@gmail.com> a
> écrit :
>
>> Hey David ,
>> My consumers are registered , here is the debug log. The problem is the
>> broker does not belong to me , so I can’t see what is going on there . But
>> this is a new consumer group , so there is no state yet .
>>
>>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
>> Consumer subtask 0 will start reading the following 40 partitions from the 
>> committed group offsets in Kafka: 
>> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, 
>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]
>>
>> On Thu, Aug 29, 2019 at 11:39 AM David Morin <morin.david....@gmail.com>
>> wrote:
>>
>>> Hello Vishwas,
>>>
>>> You can use a keytab if you prefer. You generate a keytab for your user
>>> and then you can reference it in the Flink configuration.
>>> Then this keytab will be handled by Flink in a secure way and TGT will
>>> be created based on this keytab.
>>> However, that seems to be working.
>>> Did you check Kafka logs on the broker side ?
>>> Or did you check consumer offsets with Kafka tools in order to validate
>>> consumers are registered onto the different partitions of your topic ?
>>> You could try to switch to a different groupid for your consumer group
>>> in order to force parallel consumption.
>>>
>>> Le jeu. 29 août 2019 à 09:57, Vishwas Siravara <vsirav...@gmail.com> a
>>> écrit :
>>>
>>>> I see this log as well , but I can't see any messages . I know for a
>>>> fact that the topic I am subscribed to has messages as I checked with a
>>>> simple java consumer with a different group.
>>>>
>>>>
>>>>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
>>>> Consumer subtask 0 will start reading the following 40 partitions from the 
>>>> committed group offsets in Kafka: 
>>>> [KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=22}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=21}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=20}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=19}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=26}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=25}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=24}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=23}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=30}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=29}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=28}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=27}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=34}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=33}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=32}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=31}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=38}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=37}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=36}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=35}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=39}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=2}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=1}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=0}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=6}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=5}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=4}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=3}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=10}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=9}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=8}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=7}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=14}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=13}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=12}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=11}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=18}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=17}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=16}, 
>>>> KafkaTopicPartition{topic='gbl_auth_raw_occ_c', partition=15}]
>>>>
>>>>
>>>> On Thu, Aug 29, 2019 at 2:02 AM Vishwas Siravara <vsirav...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi guys,
>>>>> I am using kerberos for my kafka source. I pass the jaas config and
>>>>> krb5.conf in the env.java.opts: -Dconfig.resource=qa.conf
>>>>> -Djava.library.path=/usr/mware/SimpleAPI/voltage-simple-api-java-05.12.0000-Linux-x86_64-64b-r234867/lib/
>>>>> -Djava.security.auth.login.config=/home/was/Jaas/kafka-jaas.conf
>>>>> -Djava.security.krb5.conf=/home/was/Jaas/krb5.conf
>>>>>
>>>>> When I look at debug logs I see that the consumer was created with the
>>>>> following properties.
>>>>>
>>>>> 2019-08-29 06:49:18,298 INFO  
>>>>> org.apache.kafka.clients.consumer.ConsumerConfig              - 
>>>>> ConsumerConfig values:
>>>>>         auto.commit.interval.ms = 5000
>>>>>         auto.offset.reset = latest
>>>>>         bootstrap.servers = [sl73oprdbd018.visa.com:9092]
>>>>>         check.crcs = true
>>>>>         client.id = consumer-2
>>>>>         connections.max.idle.ms = 540000
>>>>>         enable.auto.commit = true
>>>>>         exclude.internal.topics = true
>>>>>         fetch.max.bytes = 52428800
>>>>>         fetch.max.wait.ms = 500
>>>>>         fetch.min.bytes = 1
>>>>>
>>>>>
>>>>>         group.id = flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)
>>>>>         heartbeat.interval.ms = 3000
>>>>>         interceptor.classes = null
>>>>>         key.deserializer = class 
>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>>>>         max.partition.fetch.bytes = 1048576
>>>>>         max.poll.interval.ms = 300000
>>>>>         max.poll.records = 500
>>>>>         metadata.max.age.ms = 300000
>>>>>         metric.reporters = []
>>>>>         metrics.num.samples = 2
>>>>>         metrics.recording.level = INFO
>>>>>         metrics.sample.window.ms = 30000
>>>>>         partition.assignment.strategy = [class 
>>>>> org.apache.kafka.clients.consumer.RangeAssignor]
>>>>>         receive.buffer.bytes = 65536
>>>>>         reconnect.backoff.ms = 50
>>>>>         request.timeout.ms = 305000
>>>>>         retry.backoff.ms = 100
>>>>>         sasl.jaas.config = null
>>>>>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>>>         sasl.kerberos.min.time.before.relogin = 60000
>>>>>         sasl.kerberos.service.name = null
>>>>>         sasl.kerberos.ticket.renew.jitter = 0.05
>>>>>         sasl.kerberos.ticket.renew.window.factor = 0.8
>>>>>         sasl.mechanism = GSSAPI
>>>>>         security.protocol = SASL_PLAINTEXT
>>>>>         send.buffer.bytes = 131072
>>>>>         session.timeout.ms = 10000
>>>>>         ssl.cipher.suites = null
>>>>>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>>>         ssl.endpoint.identification.algorithm = null
>>>>>         ssl.key.password = null
>>>>>         ssl.keymanager.algorithm = SunX509
>>>>>         ssl.keystore.location = null
>>>>>         ssl.keystore.password = null
>>>>>         ssl.keystore.type = JKS
>>>>>         ssl.protocol = TLS
>>>>>         ssl.provider = null
>>>>>         ssl.secure.random.implementation = null
>>>>>         ssl.trustmanager.algorithm = PKIX
>>>>>         ssl.truststore.location = null
>>>>>         ssl.truststore.password = null
>>>>>         ssl.truststore.type = JKS
>>>>>         value.deserializer = class 
>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>>>>
>>>>>
>>>>> I can also see that the kerberos login is working fine. Here is the log 
>>>>> for it:
>>>>>
>>>>>
>>>>>
>>>>> 2019-08-29 06:49:18,312 INFO  
>>>>> org.apache.kafka.common.security.authenticator.AbstractLogin  - 
>>>>> Successfully logged in.
>>>>> 2019-08-29 06:49:18,313 INFO  
>>>>> org.apache.kafka.common.security.kerberos.KerberosLogin       - 
>>>>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT refresh 
>>>>> thread started.
>>>>> 2019-08-29 06:49:18,314 INFO  
>>>>> org.apache.kafka.common.security.kerberos.KerberosLogin       - 
>>>>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT valid 
>>>>> starting at: Thu Aug 29 06:49:18 GMT 2019
>>>>> 2019-08-29 06:49:18,314 INFO  
>>>>> org.apache.kafka.common.security.kerberos.KerberosLogin       - 
>>>>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT expires: 
>>>>> Thu Aug 29 16:49:18 GMT 2019
>>>>> 2019-08-29 06:49:18,315 INFO  
>>>>> org.apache.kafka.common.security.kerberos.KerberosLogin       - 
>>>>> [Principal=kafka/sl73rspapd035.visa....@corpdev.visa.com]: TGT refresh 
>>>>> sleeping until: Thu Aug 29 15:00:10 GMT 2019
>>>>> 2019-08-29 06:49:18,316 WARN  
>>>>> org.apache.kafka.clients.consumer.ConsumerConfig              - The 
>>>>> configuration 'zookeeper.connect' was supplied but isn't a known config.
>>>>> 2019-08-29 06:49:18,316 INFO  org.apache.kafka.common.utils.AppInfoParser 
>>>>>                   - Kafka version : 0.10.2.0
>>>>> 2019-08-29 06:49:18,316 INFO  org.apache.kafka.common.utils.AppInfoParser 
>>>>>                   - Kafka commitId : 576d93a8dc0cf421
>>>>>
>>>>>
>>>>> I then see this log :
>>>>>
>>>>> INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - 
>>>>> Marking the coordinator sl73oprdbd017.visa.com:9092 (id: 2147482633 rack: 
>>>>> null) dead for group flink-AIP-XX-druid-List(gbl_auth_raw_occ_c)
>>>>>
>>>>>
>>>>>
>>>>> *The problem is I do not see any error log but there is no data being 
>>>>> processed by the consmer and it has been a nightmare to debug. *
>>>>>
>>>>>
>>>>> Thanks for all the help .
>>>>>
>>>>>
>>>>> Thanks,Vishwas
>>>>>
>>>>>

Reply via email to