[ https://issues.apache.org/jira/browse/KAFKA-4544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16706703#comment-16706703 ]
ASF GitHub Bot commented on KAFKA-4544: --------------------------------------- omkreddy closed pull request #5660: KAFKA-4544: Add system tests for delegation token based authentication URL: https://github.com/apache/kafka/pull/5660 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 65c9fa589c0..dfbec9f83da 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -60,8 +60,9 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService) def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=True, message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=DEV_BRANCH, client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None, - enable_systest_events=False, stop_timeout_sec=30, print_timestamp=False, - isolation_level="read_uncommitted", jaas_override_variables=None): + enable_systest_events=False, stop_timeout_sec=35, print_timestamp=False, + isolation_level="read_uncommitted", jaas_override_variables=None, + kafka_opts_override="", client_prop_file_override=""): """ Args: context: standard context @@ -83,7 +84,8 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro print_timestamp if True, print each message's timestamp as well isolation_level How to handle transactional messages. jaas_override_variables A dict of variables to be used in the jaas.conf template file - + kafka_opts_override Override parameters of the KAFKA_OPTS environment variable + client_prop_file_override Override client.properties file used by the consumer """ JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), root=ConsoleConsumer.PERSISTENT_ROOT) @@ -116,6 +118,9 @@ def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-gro self.print_timestamp = print_timestamp self.jaas_override_variables = jaas_override_variables or {} + self.kafka_opts_override = kafka_opts_override + self.client_prop_file_override = client_prop_file_override + def prop_file(self, node): """Return a string which can be used to create a configuration file appropriate for the given node.""" @@ -134,6 +139,7 @@ def prop_file(self, node): prop_file += str(self.security_config) return prop_file + def start_cmd(self, node): """Return the start command appropriate for the given node.""" args = self.args.copy() @@ -147,14 +153,19 @@ def start_cmd(self, node): args['jmx_port'] = self.jmx_port args['console_consumer'] = self.path.script("kafka-console-consumer.sh", node) args['broker_list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol) - args['kafka_opts'] = self.security_config.kafka_opts + + if self.kafka_opts_override: + args['kafka_opts'] = "\"%s\"" % self.kafka_opts_override + else: + args['kafka_opts'] = self.security_config.kafka_opts cmd = "export JMX_PORT=%(jmx_port)s; " \ "export LOG_DIR=%(log_dir)s; " \ "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \ "export KAFKA_OPTS=%(kafka_opts)s; " \ "%(console_consumer)s " \ - "--topic %(topic)s --consumer.config %(config_file)s" % args + "--topic %(topic)s " \ + "--consumer.config %(config_file)s " % args if self.new_consumer: assert node.version >= V_0_9_0_0, \ @@ -209,7 +220,15 @@ def _worker(self, idx, node): # Create and upload config file self.logger.info("console_consumer.properties:") - prop_file = self.prop_file(node) + self.security_config = self.kafka.security_config.client_config(node=node, + jaas_override_variables=self.jaas_override_variables) + self.security_config.setup_node(node) + + if self.client_prop_file_override: + prop_file = self.client_prop_file_override + else: + prop_file = self.prop_file(node) + self.logger.info(prop_file) node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file) diff --git a/tests/kafkatest/services/delegation_tokens.py b/tests/kafkatest/services/delegation_tokens.py new file mode 100644 index 00000000000..34da16bcab2 --- /dev/null +++ b/tests/kafkatest/services/delegation_tokens.py @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os.path +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin + +""" +Delegation tokens is a tool to manage the lifecycle of delegation tokens. +All commands are executed on a secured Kafka node reusing its generated jaas.conf and krb5.conf. +""" + +class DelegationTokens(KafkaPathResolverMixin): + def __init__(self, kafka, context): + self.client_properties_content = """ +security.protocol=SASL_PLAINTEXT +sasl.kerberos.service.name=kafka +""" + self.context = context + self.command_path = self.path.script("kafka-delegation-tokens.sh") + self.kafka_opts = "KAFKA_OPTS=\"-Djava.security.auth.login.config=/mnt/security/jaas.conf " \ + "-Djava.security.krb5.conf=/mnt/security/krb5.conf\" " + self.kafka = kafka + self.bootstrap_server = " --bootstrap-server " + self.kafka.bootstrap_servers('SASL_PLAINTEXT') + self.base_cmd = self.kafka_opts + self.command_path + self.bootstrap_server + self.client_prop_path = os.path.join(self.kafka.PERSISTENT_ROOT, "client.properties") + self.jaas_deleg_conf_path = os.path.join(self.kafka.PERSISTENT_ROOT, "jaas_deleg.conf") + self.token_hmac_path = os.path.join(self.kafka.PERSISTENT_ROOT, "deleg_token_hmac.out") + self.delegation_token_out = os.path.join(self.kafka.PERSISTENT_ROOT, "delegation_token.out") + self.expire_delegation_token_out = os.path.join(self.kafka.PERSISTENT_ROOT, "expire_delegation_token.out") + self.renew_delegation_token_out = os.path.join(self.kafka.PERSISTENT_ROOT, "renew_delegation_token.out") + + self.node = self.kafka.nodes[0] + + def generate_delegation_token(self, maxlifetimeperiod=-1): + self.node.account.create_file(self.client_prop_path, self.client_properties_content) + + cmd = self.base_cmd + " --create" \ + " --max-life-time-period %s" \ + " --command-config %s > %s" % (maxlifetimeperiod, self.client_prop_path, self.delegation_token_out) + self.node.account.ssh(cmd, allow_fail=False) + + def expire_delegation_token(self, hmac): + cmd = self.base_cmd + " --expire" \ + " --expiry-time-period -1" \ + " --hmac %s" \ + " --command-config %s > %s" % (hmac, self.client_prop_path, self.expire_delegation_token_out) + self.node.account.ssh(cmd, allow_fail=False) + + def renew_delegation_token(self, hmac, renew_time_period=-1): + cmd = self.base_cmd + " --renew" \ + " --renew-time-period %s" \ + " --hmac %s" \ + " --command-config %s > %s" \ + % (renew_time_period, hmac, self.client_prop_path, self.renew_delegation_token_out) + return self.node.account.ssh_capture(cmd, allow_fail=False) + + def create_jaas_conf_with_delegation_token(self): + dt = self.parse_delegation_token_out() + jaas_deleg_content = """ +KafkaClient { + org.apache.kafka.common.security.scram.ScramLoginModule required + username="%s" + password="%s" + tokenauth=true; +}; +""" % (dt["tokenid"], dt["hmac"]) + self.node.account.create_file(self.jaas_deleg_conf_path, jaas_deleg_content) + + return jaas_deleg_content + + def token_hmac(self): + dt = self.parse_delegation_token_out() + return dt["hmac"] + + def parse_delegation_token_out(self): + cmd = "tail -1 %s" % self.delegation_token_out + + output_iter = self.node.account.ssh_capture(cmd, allow_fail=False) + output = "" + for line in output_iter: + output += line + + tokenid, hmac, owner, renewers, issuedate, expirydate, maxdate = output.split() + return {"tokenid" : tokenid, + "hmac" : hmac, + "owner" : owner, + "renewers" : renewers, + "issuedate" : issuedate, + "expirydate" :expirydate, + "maxdate" : maxdate} \ No newline at end of file diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py index 621b8e5b55b..853bd83896f 100644 --- a/tests/kafkatest/services/kafka/config_property.py +++ b/tests/kafkatest/services/kafka/config_property.py @@ -50,6 +50,12 @@ LOG_ROLL_TIME_MS = "log.roll.ms" OFFSETS_TOPIC_NUM_PARTITIONS = "offsets.topic.num.partitions" +DELEGATION_TOKEN_MAX_LIFETIME_MS="delegation.token.max.lifetime.ms" +DELEGATION_TOKEN_EXPIRY_TIME_MS="delegation.token.expiry.time.ms" +DELEGATION_TOKEN_MASTER_KEY="delegation.token.master.key" +SASL_ENABLED_MECHANISMS="sasl.enabled.mechanisms" + + """ From KafkaConfig.scala diff --git a/tests/kafkatest/services/security/templates/jaas.conf b/tests/kafkatest/services/security/templates/jaas.conf index e2511451e32..3d6c93e09be 100644 --- a/tests/kafkatest/services/security/templates/jaas.conf +++ b/tests/kafkatest/services/security/templates/jaas.conf @@ -15,7 +15,7 @@ {% if static_jaas_conf %} KafkaClient { {% endif %} -{% if client_sasl_mechanism == "GSSAPI" %} +{% if "GSSAPI" in client_sasl_mechanism %} {% if is_ibm_jdk %} com.ibm.security.auth.module.Krb5LoginModule required debug=false credsType=both @@ -33,7 +33,7 @@ KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="client" password="client-secret"; -{% elif client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism == "SCRAM-SHA-512" %} +{% elif "SCRAM-SHA-256" in client_sasl_mechanism or "SCRAM-SHA-512" in client_sasl_mechanism %} org.apache.kafka.common.security.scram.ScramLoginModule required username="{{ SecurityConfig.SCRAM_CLIENT_USER }}" password="{{ SecurityConfig.SCRAM_CLIENT_PASSWORD }}"; @@ -65,7 +65,7 @@ KafkaServer { user_client="client-secret" user_kafka="kafka-secret"; {% endif %} -{% if client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism == "SCRAM-SHA-512" %} +{% if "SCRAM-SHA-256" in client_sasl_mechanism or "SCRAM-SHA-512" in client_sasl_mechanism %} org.apache.kafka.common.security.scram.ScramLoginModule required username="{{ SecurityConfig.SCRAM_BROKER_USER }}" password="{{ SecurityConfig.SCRAM_BROKER_PASSWORD }}"; diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 7fa2654db49..f339a62313f 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -58,18 +58,21 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput message_validator=is_int, compression_types=None, version=DEV_BRANCH, acks=None, stop_timeout_sec=150, request_timeout_sec=30, log_level="INFO", enable_idempotence=False, offline_nodes=[], create_time=-1, repeating_keys=None, - jaas_override_variables=None): + jaas_override_variables=None, kafka_opts_override="", client_prop_file_override=""): """ - :param max_messages is a number of messages to be produced per producer - :param message_validator checks for an expected format of messages produced. There are - currently two: - * is_int is an integer format; this is default and expected to be used if - num_nodes = 1 - * is_int_with_prefix recommended if num_nodes > 1, because otherwise each producer - will produce exactly same messages, and validation may miss missing messages. - :param compression_types: If None, all producers will not use compression; or a list of - compression types, one per producer (could be "none"). - :param jaas_override_variables: A dict of variables to be used in the jaas.conf template file + Args: + :param max_messages number of messages to be produced per producer + :param message_validator checks for an expected format of messages produced. There are + currently two: + * is_int is an integer format; this is default and expected to be used if + num_nodes = 1 + * is_int_with_prefix recommended if num_nodes > 1, because otherwise each producer + will produce exactly same messages, and validation may miss missing messages. + :param compression_types If None, all producers will not use compression; or a list of compression types, + one per producer (could be "none"). + :param jaas_override_variables A dict of variables to be used in the jaas.conf template file + :param kafka_opts_override Override parameters of the KAFKA_OPTS environment variable + :param client_prop_file_override Override client.properties file used by the consumer """ super(VerifiableProducer, self).__init__(context, num_nodes) self.log_level = log_level @@ -97,6 +100,9 @@ def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput self.create_time = create_time self.repeating_keys = repeating_keys self.jaas_override_variables = jaas_override_variables or {} + self.kafka_opts_override = kafka_opts_override + self.client_prop_file_override = client_prop_file_override + def java_class_name(self): return "VerifiableProducer" @@ -125,7 +131,11 @@ def _worker(self, idx, node): self.security_config.setup_node(node) # Create and upload config file - producer_prop_file = self.prop_file(node) + if self.client_prop_file_override: + producer_prop_file = self.client_prop_file_override + else: + producer_prop_file = self.prop_file(node) + if self.acks is not None: self.logger.info("VerifiableProducer (index = %d) will use acks = %s", idx, self.acks) producer_prop_file += "\nacks=%s\n" % self.acks @@ -189,7 +199,11 @@ def _has_output(self, node): def start_cmd(self, node, idx): cmd = "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR - cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts + if self.kafka_opts_override: + cmd += " export KAFKA_OPTS=\"%s\";" % self.kafka_opts_override + else: + cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts + cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG cmd += self.impl.exec_cmd(node) cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_config.security_protocol, True, self.offline_nodes)) @@ -207,6 +221,7 @@ def start_cmd(self, node, idx): cmd += " --repeating-keys %s " % str(self.repeating_keys) cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE + cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE) return cmd diff --git a/tests/kafkatest/tests/core/delegation_token_test.py b/tests/kafkatest/tests/core/delegation_token_test.py new file mode 100644 index 00000000000..0b2b6eb63fc --- /dev/null +++ b/tests/kafkatest/tests/core/delegation_token_test.py @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until +from kafkatest.services.kafka import config_property, KafkaService +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.delegation_tokens import DelegationTokens +from kafkatest.services.verifiable_producer import VerifiableProducer + +from datetime import datetime +import time + +""" +Basic tests to validate delegation token support +""" +class DelegationTokenTest(Test): + def __init__(self, test_context): + super(DelegationTokenTest, self).__init__(test_context) + + self.test_context = test_context + self.topic = "topic" + self.zk = ZookeeperService(test_context, num_nodes=1) + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, zk_chroot="/kafka", + topics={self.topic: {"partitions": 1, "replication-factor": 1}}, + server_prop_overides=[ + [config_property.DELEGATION_TOKEN_MAX_LIFETIME_MS, "604800000"], + [config_property.DELEGATION_TOKEN_EXPIRY_TIME_MS, "86400000"], + [config_property.DELEGATION_TOKEN_MASTER_KEY, "test12345"], + [config_property.SASL_ENABLED_MECHANISMS, "GSSAPI,SCRAM-SHA-256"] + ]) + self.jaas_deleg_conf_path = "/tmp/jaas_deleg.conf" + self.jaas_deleg_conf = "" + self.client_properties_content = """ +security.protocol=SASL_PLAINTEXT +sasl.mechanism=SCRAM-SHA-256 +sasl.kerberos.service.name=kafka +client.id=console-consumer +""" + self.client_kafka_opts=' -Djava.security.auth.login.config=' + self.jaas_deleg_conf_path + + self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, max_messages=1, + throughput=1, kafka_opts_override=self.client_kafka_opts, + client_prop_file_override=self.client_properties_content) + + self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, + kafka_opts_override=self.client_kafka_opts, + client_prop_file_override=self.client_properties_content) + + self.kafka.security_protocol = 'SASL_PLAINTEXT' + self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256' + self.kafka.interbroker_sasl_mechanism = 'GSSAPI' + + + def setUp(self): + self.zk.start() + + def tearDown(self): + self.producer.nodes[0].account.remove(self.jaas_deleg_conf_path) + self.consumer.nodes[0].account.remove(self.jaas_deleg_conf_path) + + def generate_delegation_token(self): + self.logger.debug("Request delegation token") + self.delegation_tokens.generate_delegation_token() + self.jaas_deleg_conf = self.delegation_tokens.create_jaas_conf_with_delegation_token() + + def expire_delegation_token(self): + self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256' + token_hmac = self.delegation_tokens.token_hmac() + self.delegation_tokens.expire_delegation_token(token_hmac) + + + def produce_with_delegation_token(self): + self.producer.acked_values = [] + self.producer.nodes[0].account.create_file(self.jaas_deleg_conf_path, self.jaas_deleg_conf) + self.logger.debug(self.jaas_deleg_conf) + self.producer.start() + + def consume_with_delegation_token(self): + self.logger.debug("Consume messages with delegation token") + + self.consumer.nodes[0].account.create_file(self.jaas_deleg_conf_path, self.jaas_deleg_conf) + self.logger.debug(self.jaas_deleg_conf) + self.consumer.consumer_timeout_ms = 5000 + + self.consumer.start() + self.consumer.wait() + + def get_datetime_ms(self, input_date): + return int(time.mktime(datetime.strptime(input_date,"%Y-%m-%dT%H:%M").timetuple()) * 1000) + + def renew_delegation_token(self): + dt = self.delegation_tokens.parse_delegation_token_out() + orig_expiry_date_ms = self.get_datetime_ms(dt["expirydate"]) + new_expirydate_ms = orig_expiry_date_ms + 1000 + + self.delegation_tokens.renew_delegation_token(dt["hmac"], new_expirydate_ms) + + def test_delegation_token_lifecycle(self): + self.kafka.start() + self.delegation_tokens = DelegationTokens(self.kafka, self.test_context) + + self.generate_delegation_token() + self.renew_delegation_token() + self.produce_with_delegation_token() + wait_until(lambda: self.producer.num_acked > 0, timeout_sec=30, + err_msg="Expected producer to still be producing.") + assert 1 == self.producer.num_acked, "number of acked messages: %d" % self.producer.num_acked + + self.consume_with_delegation_token() + num_consumed = len(self.consumer.messages_consumed[1]) + assert 1 == num_consumed, "number of consumed messages: %d" % num_consumed + + self.expire_delegation_token() + + self.produce_with_delegation_token() + assert 0 == self.producer.num_acked, "number of acked messages: %d" % self.producer.num_acked \ No newline at end of file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add system tests for delegation token based authentication > ---------------------------------------------------------- > > Key: KAFKA-4544 > URL: https://issues.apache.org/jira/browse/KAFKA-4544 > Project: Kafka > Issue Type: Sub-task > Components: security > Reporter: Ashish Singh > Assignee: Attila Sasvari > Priority: Major > Fix For: 2.2.0 > > > Add system tests for delegation token based authentication. -- This message was sent by Atlassian JIRA (v7.6.3#76005)