[ 
https://issues.apache.org/jira/browse/KAFKA-4544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16706703#comment-16706703
 ] 

ASF GitHub Bot commented on KAFKA-4544:
---------------------------------------

omkreddy closed pull request #5660: KAFKA-4544: Add system tests for delegation 
token based authentication
URL: https://github.com/apache/kafka/pull/5660
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/kafkatest/services/console_consumer.py 
b/tests/kafkatest/services/console_consumer.py
index 65c9fa589c0..dfbec9f83da 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -60,8 +60,9 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadService)
     def __init__(self, context, num_nodes, kafka, topic, 
group_id="test-consumer-group", new_consumer=True,
                  message_validator=None, from_beginning=True, 
consumer_timeout_ms=None, version=DEV_BRANCH,
                  client_id="console-consumer", print_key=False, 
jmx_object_names=None, jmx_attributes=None,
-                 enable_systest_events=False, stop_timeout_sec=30, 
print_timestamp=False,
-                 isolation_level="read_uncommitted", 
jaas_override_variables=None):
+                 enable_systest_events=False, stop_timeout_sec=35, 
print_timestamp=False,
+                 isolation_level="read_uncommitted", 
jaas_override_variables=None,
+                 kafka_opts_override="", client_prop_file_override=""):
         """
         Args:
             context:                    standard context
@@ -83,7 +84,8 @@ def __init__(self, context, num_nodes, kafka, topic, 
group_id="test-consumer-gro
             print_timestamp             if True, print each message's 
timestamp as well
             isolation_level             How to handle transactional messages.
             jaas_override_variables     A dict of variables to be used in the 
jaas.conf template file
-
+            kafka_opts_override         Override parameters of the KAFKA_OPTS 
environment variable
+            client_prop_file_override   Override client.properties file used 
by the consumer
         """
         JmxMixin.__init__(self, num_nodes=num_nodes, 
jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
                           root=ConsoleConsumer.PERSISTENT_ROOT)
@@ -116,6 +118,9 @@ def __init__(self, context, num_nodes, kafka, topic, 
group_id="test-consumer-gro
 
         self.print_timestamp = print_timestamp
         self.jaas_override_variables = jaas_override_variables or {}
+        self.kafka_opts_override = kafka_opts_override
+        self.client_prop_file_override = client_prop_file_override
+
 
     def prop_file(self, node):
         """Return a string which can be used to create a configuration file 
appropriate for the given node."""
@@ -134,6 +139,7 @@ def prop_file(self, node):
         prop_file += str(self.security_config)
         return prop_file
 
+
     def start_cmd(self, node):
         """Return the start command appropriate for the given node."""
         args = self.args.copy()
@@ -147,14 +153,19 @@ def start_cmd(self, node):
         args['jmx_port'] = self.jmx_port
         args['console_consumer'] = 
self.path.script("kafka-console-consumer.sh", node)
         args['broker_list'] = 
self.kafka.bootstrap_servers(self.security_config.security_protocol)
-        args['kafka_opts'] = self.security_config.kafka_opts
+
+        if self.kafka_opts_override:
+            args['kafka_opts'] = "\"%s\"" % self.kafka_opts_override
+        else:
+            args['kafka_opts'] = self.security_config.kafka_opts
 
         cmd = "export JMX_PORT=%(jmx_port)s; " \
               "export LOG_DIR=%(log_dir)s; " \
               "export 
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \
               "export KAFKA_OPTS=%(kafka_opts)s; " \
               "%(console_consumer)s " \
-              "--topic %(topic)s --consumer.config %(config_file)s" % args
+              "--topic %(topic)s " \
+              "--consumer.config %(config_file)s " % args
 
         if self.new_consumer:
             assert node.version >= V_0_9_0_0, \
@@ -209,7 +220,15 @@ def _worker(self, idx, node):
         # Create and upload config file
         self.logger.info("console_consumer.properties:")
 
-        prop_file = self.prop_file(node)
+        self.security_config = 
self.kafka.security_config.client_config(node=node,
+                                                                        
jaas_override_variables=self.jaas_override_variables)
+        self.security_config.setup_node(node)
+
+        if self.client_prop_file_override:
+            prop_file = self.client_prop_file_override
+        else:
+            prop_file = self.prop_file(node)
+
         self.logger.info(prop_file)
         node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
 
diff --git a/tests/kafkatest/services/delegation_tokens.py 
b/tests/kafkatest/services/delegation_tokens.py
new file mode 100644
index 00000000000..34da16bcab2
--- /dev/null
+++ b/tests/kafkatest/services/delegation_tokens.py
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os.path
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+
+"""
+Delegation tokens is a tool to manage the lifecycle of delegation tokens.
+All commands are executed on a secured Kafka node reusing its generated 
jaas.conf and krb5.conf.
+"""
+
+class DelegationTokens(KafkaPathResolverMixin):
+    def __init__(self, kafka, context):
+        self.client_properties_content = """
+security.protocol=SASL_PLAINTEXT
+sasl.kerberos.service.name=kafka
+"""
+        self.context = context
+        self.command_path = self.path.script("kafka-delegation-tokens.sh")
+        self.kafka_opts = 
"KAFKA_OPTS=\"-Djava.security.auth.login.config=/mnt/security/jaas.conf " \
+                          "-Djava.security.krb5.conf=/mnt/security/krb5.conf\" 
"
+        self.kafka = kafka
+        self.bootstrap_server = " --bootstrap-server " + 
self.kafka.bootstrap_servers('SASL_PLAINTEXT')
+        self.base_cmd = self.kafka_opts + self.command_path + 
self.bootstrap_server
+        self.client_prop_path = os.path.join(self.kafka.PERSISTENT_ROOT, 
"client.properties")
+        self.jaas_deleg_conf_path = os.path.join(self.kafka.PERSISTENT_ROOT, 
"jaas_deleg.conf")
+        self.token_hmac_path = os.path.join(self.kafka.PERSISTENT_ROOT, 
"deleg_token_hmac.out")
+        self.delegation_token_out = os.path.join(self.kafka.PERSISTENT_ROOT, 
"delegation_token.out")
+        self.expire_delegation_token_out = 
os.path.join(self.kafka.PERSISTENT_ROOT, "expire_delegation_token.out")
+        self.renew_delegation_token_out = 
os.path.join(self.kafka.PERSISTENT_ROOT, "renew_delegation_token.out")
+
+        self.node = self.kafka.nodes[0]
+
+    def generate_delegation_token(self, maxlifetimeperiod=-1):
+        self.node.account.create_file(self.client_prop_path, 
self.client_properties_content)
+
+        cmd = self.base_cmd + "  --create" \
+                              "  --max-life-time-period %s" \
+                              "  --command-config %s > %s" % 
(maxlifetimeperiod, self.client_prop_path, self.delegation_token_out)
+        self.node.account.ssh(cmd, allow_fail=False)
+
+    def expire_delegation_token(self, hmac):
+        cmd = self.base_cmd + "  --expire" \
+                              "  --expiry-time-period -1" \
+                              "  --hmac %s" \
+                              "  --command-config %s > %s" % (hmac, 
self.client_prop_path, self.expire_delegation_token_out)
+        self.node.account.ssh(cmd, allow_fail=False)
+
+    def renew_delegation_token(self, hmac, renew_time_period=-1):
+        cmd = self.base_cmd + "  --renew" \
+                              "  --renew-time-period %s" \
+                              "  --hmac %s" \
+                              "  --command-config %s > %s" \
+              % (renew_time_period, hmac, self.client_prop_path, 
self.renew_delegation_token_out)
+        return self.node.account.ssh_capture(cmd, allow_fail=False)
+
+    def create_jaas_conf_with_delegation_token(self):
+        dt = self.parse_delegation_token_out()
+        jaas_deleg_content = """
+KafkaClient {
+  org.apache.kafka.common.security.scram.ScramLoginModule required
+  username="%s"
+  password="%s"
+  tokenauth=true;
+};
+""" % (dt["tokenid"], dt["hmac"])
+        self.node.account.create_file(self.jaas_deleg_conf_path, 
jaas_deleg_content)
+
+        return jaas_deleg_content
+
+    def token_hmac(self):
+        dt = self.parse_delegation_token_out()
+        return dt["hmac"]
+
+    def parse_delegation_token_out(self):
+        cmd = "tail -1 %s" % self.delegation_token_out
+
+        output_iter = self.node.account.ssh_capture(cmd, allow_fail=False)
+        output = ""
+        for line in output_iter:
+            output += line
+
+        tokenid, hmac, owner, renewers, issuedate, expirydate, maxdate = 
output.split()
+        return {"tokenid" : tokenid,
+                "hmac" : hmac,
+                "owner" : owner,
+                "renewers" : renewers,
+                "issuedate" : issuedate,
+                "expirydate" :expirydate,
+                "maxdate" : maxdate}
\ No newline at end of file
diff --git a/tests/kafkatest/services/kafka/config_property.py 
b/tests/kafkatest/services/kafka/config_property.py
index 621b8e5b55b..853bd83896f 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -50,6 +50,12 @@
 LOG_ROLL_TIME_MS = "log.roll.ms"
 OFFSETS_TOPIC_NUM_PARTITIONS = "offsets.topic.num.partitions"
 
+DELEGATION_TOKEN_MAX_LIFETIME_MS="delegation.token.max.lifetime.ms"
+DELEGATION_TOKEN_EXPIRY_TIME_MS="delegation.token.expiry.time.ms"
+DELEGATION_TOKEN_MASTER_KEY="delegation.token.master.key"
+SASL_ENABLED_MECHANISMS="sasl.enabled.mechanisms"
+
+
 """
 From KafkaConfig.scala
 
diff --git a/tests/kafkatest/services/security/templates/jaas.conf 
b/tests/kafkatest/services/security/templates/jaas.conf
index e2511451e32..3d6c93e09be 100644
--- a/tests/kafkatest/services/security/templates/jaas.conf
+++ b/tests/kafkatest/services/security/templates/jaas.conf
@@ -15,7 +15,7 @@
 {% if static_jaas_conf %}
 KafkaClient {
 {% endif %}
-{% if client_sasl_mechanism == "GSSAPI" %}
+{% if "GSSAPI" in client_sasl_mechanism %}
 {% if is_ibm_jdk %}
     com.ibm.security.auth.module.Krb5LoginModule required debug=false
     credsType=both
@@ -33,7 +33,7 @@ KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="client"
        password="client-secret";
-{% elif client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism == 
"SCRAM-SHA-512" %}
+{% elif "SCRAM-SHA-256" in client_sasl_mechanism or "SCRAM-SHA-512" in 
client_sasl_mechanism  %}
        org.apache.kafka.common.security.scram.ScramLoginModule required
        username="{{ SecurityConfig.SCRAM_CLIENT_USER }}"
        password="{{ SecurityConfig.SCRAM_CLIENT_PASSWORD }}";
@@ -65,7 +65,7 @@ KafkaServer {
        user_client="client-secret"
        user_kafka="kafka-secret";
 {% endif %}
-{% if client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism == 
"SCRAM-SHA-512" %}
+{% if "SCRAM-SHA-256" in client_sasl_mechanism or "SCRAM-SHA-512" in 
client_sasl_mechanism %}
        org.apache.kafka.common.security.scram.ScramLoginModule required
        username="{{ SecurityConfig.SCRAM_BROKER_USER }}"
        password="{{ SecurityConfig.SCRAM_BROKER_PASSWORD }}";
diff --git a/tests/kafkatest/services/verifiable_producer.py 
b/tests/kafkatest/services/verifiable_producer.py
index 7fa2654db49..f339a62313f 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -58,18 +58,21 @@ def __init__(self, context, num_nodes, kafka, topic, 
max_messages=-1, throughput
                  message_validator=is_int, compression_types=None, 
version=DEV_BRANCH, acks=None,
                  stop_timeout_sec=150, request_timeout_sec=30, 
log_level="INFO",
                  enable_idempotence=False, offline_nodes=[], create_time=-1, 
repeating_keys=None,
-                 jaas_override_variables=None):
+                 jaas_override_variables=None, kafka_opts_override="", 
client_prop_file_override=""):
         """
-        :param max_messages is a number of messages to be produced per producer
-        :param message_validator checks for an expected format of messages 
produced. There are
-        currently two:
-               * is_int is an integer format; this is default and expected to 
be used if
-               num_nodes = 1
-               * is_int_with_prefix recommended if num_nodes > 1, because 
otherwise each producer
-               will produce exactly same messages, and validation may miss 
missing messages.
-        :param compression_types: If None, all producers will not use 
compression; or a list of
-        compression types, one per producer (could be "none").
-        :param jaas_override_variables: A dict of variables to be used in the 
jaas.conf template file
+        Args:
+            :param max_messages                number of messages to be 
produced per producer
+            :param message_validator           checks for an expected format 
of messages produced. There are
+                                               currently two:
+                                               * is_int is an integer format; 
this is default and expected to be used if
+                                                 num_nodes = 1
+                                               * is_int_with_prefix 
recommended if num_nodes > 1, because otherwise each producer
+                                                 will produce exactly same 
messages, and validation may miss missing messages.
+            :param compression_types           If None, all producers will not 
use compression; or a list of compression types,
+                                               one per producer (could be 
"none").
+            :param jaas_override_variables     A dict of variables to be used 
in the jaas.conf template file
+            :param kafka_opts_override         Override parameters of the 
KAFKA_OPTS environment variable
+            :param client_prop_file_override   Override client.properties file 
used by the consumer
         """
         super(VerifiableProducer, self).__init__(context, num_nodes)
         self.log_level = log_level
@@ -97,6 +100,9 @@ def __init__(self, context, num_nodes, kafka, topic, 
max_messages=-1, throughput
         self.create_time = create_time
         self.repeating_keys = repeating_keys
         self.jaas_override_variables = jaas_override_variables or {}
+        self.kafka_opts_override = kafka_opts_override
+        self.client_prop_file_override = client_prop_file_override
+
 
     def java_class_name(self):
         return "VerifiableProducer"
@@ -125,7 +131,11 @@ def _worker(self, idx, node):
         self.security_config.setup_node(node)
 
         # Create and upload config file
-        producer_prop_file = self.prop_file(node)
+        if self.client_prop_file_override:
+            producer_prop_file = self.client_prop_file_override
+        else:
+            producer_prop_file = self.prop_file(node)
+
         if self.acks is not None:
             self.logger.info("VerifiableProducer (index = %d) will use acks = 
%s", idx, self.acks)
             producer_prop_file += "\nacks=%s\n" % self.acks
@@ -189,7 +199,11 @@ def _has_output(self, node):
 
     def start_cmd(self, node, idx):
         cmd  = "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR
-        cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
+        if self.kafka_opts_override:
+            cmd += " export KAFKA_OPTS=\"%s\";" % self.kafka_opts_override
+        else:
+            cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
+
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " 
% VerifiableProducer.LOG4J_CONFIG
         cmd += self.impl.exec_cmd(node)
         cmd += " --topic %s --broker-list %s" % (self.topic, 
self.kafka.bootstrap_servers(self.security_config.security_protocol, True, 
self.offline_nodes))
@@ -207,6 +221,7 @@ def start_cmd(self, node, idx):
             cmd += " --repeating-keys %s " % str(self.repeating_keys)
 
         cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
+
         cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, 
VerifiableProducer.STDOUT_CAPTURE)
         return cmd
 
diff --git a/tests/kafkatest/tests/core/delegation_token_test.py 
b/tests/kafkatest/tests/core/delegation_token_test.py
new file mode 100644
index 00000000000..0b2b6eb63fc
--- /dev/null
+++ b/tests/kafkatest/tests/core/delegation_token_test.py
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import config_property, KafkaService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.delegation_tokens import DelegationTokens
+from kafkatest.services.verifiable_producer import VerifiableProducer
+
+from datetime import datetime
+import time
+
+"""
+Basic tests to validate delegation token support
+"""
+class DelegationTokenTest(Test):
+    def __init__(self, test_context):
+        super(DelegationTokenTest, self).__init__(test_context)
+
+        self.test_context = test_context
+        self.topic = "topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, 
zk_chroot="/kafka",
+                                  topics={self.topic: {"partitions": 1, 
"replication-factor": 1}},
+                                  server_prop_overides=[
+                                      
[config_property.DELEGATION_TOKEN_MAX_LIFETIME_MS, "604800000"],
+                                      
[config_property.DELEGATION_TOKEN_EXPIRY_TIME_MS, "86400000"],
+                                      
[config_property.DELEGATION_TOKEN_MASTER_KEY, "test12345"],
+                                      
[config_property.SASL_ENABLED_MECHANISMS, "GSSAPI,SCRAM-SHA-256"]
+                                  ])
+        self.jaas_deleg_conf_path = "/tmp/jaas_deleg.conf"
+        self.jaas_deleg_conf = ""
+        self.client_properties_content = """
+security.protocol=SASL_PLAINTEXT
+sasl.mechanism=SCRAM-SHA-256
+sasl.kerberos.service.name=kafka
+client.id=console-consumer
+"""
+        self.client_kafka_opts=' -Djava.security.auth.login.config=' + 
self.jaas_deleg_conf_path
+
+        self.producer = VerifiableProducer(self.test_context, num_nodes=1, 
kafka=self.kafka, topic=self.topic, max_messages=1,
+                                       throughput=1, 
kafka_opts_override=self.client_kafka_opts,
+                                       
client_prop_file_override=self.client_properties_content)
+
+        self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, 
kafka=self.kafka, topic=self.topic,
+                                        
kafka_opts_override=self.client_kafka_opts,
+                                        
client_prop_file_override=self.client_properties_content)
+
+        self.kafka.security_protocol = 'SASL_PLAINTEXT'
+        self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256'
+        self.kafka.interbroker_sasl_mechanism = 'GSSAPI'
+
+
+    def setUp(self):
+        self.zk.start()
+
+    def tearDown(self):
+        self.producer.nodes[0].account.remove(self.jaas_deleg_conf_path)
+        self.consumer.nodes[0].account.remove(self.jaas_deleg_conf_path)
+
+    def generate_delegation_token(self):
+        self.logger.debug("Request delegation token")
+        self.delegation_tokens.generate_delegation_token()
+        self.jaas_deleg_conf = 
self.delegation_tokens.create_jaas_conf_with_delegation_token()
+
+    def expire_delegation_token(self):
+        self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256'
+        token_hmac = self.delegation_tokens.token_hmac()
+        self.delegation_tokens.expire_delegation_token(token_hmac)
+
+
+    def produce_with_delegation_token(self):
+        self.producer.acked_values = []
+        self.producer.nodes[0].account.create_file(self.jaas_deleg_conf_path, 
self.jaas_deleg_conf)
+        self.logger.debug(self.jaas_deleg_conf)
+        self.producer.start()
+
+    def consume_with_delegation_token(self):
+        self.logger.debug("Consume messages with delegation token")
+
+        self.consumer.nodes[0].account.create_file(self.jaas_deleg_conf_path, 
self.jaas_deleg_conf)
+        self.logger.debug(self.jaas_deleg_conf)
+        self.consumer.consumer_timeout_ms = 5000
+
+        self.consumer.start()
+        self.consumer.wait()
+
+    def get_datetime_ms(self, input_date):
+        return 
int(time.mktime(datetime.strptime(input_date,"%Y-%m-%dT%H:%M").timetuple()) * 
1000)
+
+    def renew_delegation_token(self):
+        dt = self.delegation_tokens.parse_delegation_token_out()
+        orig_expiry_date_ms = self.get_datetime_ms(dt["expirydate"])
+        new_expirydate_ms = orig_expiry_date_ms + 1000
+
+        self.delegation_tokens.renew_delegation_token(dt["hmac"], 
new_expirydate_ms)
+
+    def test_delegation_token_lifecycle(self):
+        self.kafka.start()
+        self.delegation_tokens = DelegationTokens(self.kafka, 
self.test_context)
+
+        self.generate_delegation_token()
+        self.renew_delegation_token()
+        self.produce_with_delegation_token()
+        wait_until(lambda: self.producer.num_acked > 0, timeout_sec=30,
+                   err_msg="Expected producer to still be producing.")
+        assert 1 == self.producer.num_acked, "number of acked messages: %d" % 
self.producer.num_acked
+
+        self.consume_with_delegation_token()
+        num_consumed = len(self.consumer.messages_consumed[1])
+        assert 1 == num_consumed, "number of consumed messages: %d" % 
num_consumed
+
+        self.expire_delegation_token()
+
+        self.produce_with_delegation_token()
+        assert 0 == self.producer.num_acked, "number of acked messages: %d" % 
self.producer.num_acked
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add system tests for delegation token based authentication
> ----------------------------------------------------------
>
>                 Key: KAFKA-4544
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4544
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: security
>            Reporter: Ashish Singh
>            Assignee: Attila Sasvari
>            Priority: Major
>             Fix For: 2.2.0
>
>
> Add system tests for delegation token based authentication.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to