junrao commented on a change in pull request #11284:
URL: https://github.com/apache/kafka/pull/11284#discussion_r734831120



##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Locale;
+import java.util.Map;
+import javax.net.ssl.SSLSocketFactory;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
+import org.apache.kafka.common.security.ssl.SslFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>ConfigurationUtils</code> is a utility class to perform basic 
configuration-related
+ * logic and is separated out here for easier, more direct testing.
+ */
+
+public class ConfigurationUtils {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ConfigurationUtils.class);
+
+    private final Map<String, ?> configs;
+
+    private final String prefix;
+
+    public ConfigurationUtils(Map<String, ?> configs) {
+        this(configs, null);
+    }
+
+    public ConfigurationUtils(Map<String, ?> configs, String saslMechanism) {

Review comment:
       It seems that the configs passed to the following method already has the 
listener prefix removed.
   
   ` void configure(Map<String, ?> configs, String saslMechanism, 
List<AppConfigurationEntry> jaasConfigEntries);
   `
   
   The logic does that is in ChannelBuilders.create(), where we have the 
following code.
   
   `        Map<String, Object> configs = channelBuilderConfigs(config, 
listenerName);
   `
   
   So, the code in the PR is correct.

##########
File path: build.gradle
##########
@@ -829,6 +829,7 @@ project(':core') {
     implementation libs.jacksonDataformatCsv
     implementation libs.jacksonJDK8Datatypes
     implementation libs.joptSimple
+    implementation libs.jose4j

Review comment:
       This is fine. It will be useful to check all other projects that depend 
on clients and see if the jose4j dependency needs to be added explicitly. For 
example, it's convenient to add it to shell and potentially connect:runtime.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import 
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * <code>OAuthBearerValidatorCallbackHandler</code> is an {@link 
AuthenticateCallbackHandler} that
+ * accepts {@link OAuthBearerValidatorCallback} and {@link 
OAuthBearerExtensionsValidatorCallback}
+ * callbacks to implement OAuth/OIDC validation. This callback handler is 
intended only to be used
+ * on the Kafka broker side as it will receive a {@link 
OAuthBearerValidatorCallback} that includes
+ * the JWT provided by the Kafka client. That JWT is validated in terms of 
format, expiration,
+ * signature, and audience and issuer (if desired). This callback handler is 
the broker side of the
+ * OAuth functionality, whereas {@link OAuthBearerLoginCallbackHandler} is 
used by clients.
+ * </p>
+ *
+ * <p>
+ * This {@link AuthenticateCallbackHandler} is enabled in the broker 
configuration by setting the
+ * {@link 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs#SASL_SERVER_CALLBACK_HANDLER_CLASS}
+ * like so:
+ *
+ * <code>
+ * listener.name.<listener 
name>.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
+ * </code>
+ * </p>
+ *
+ * <p>
+ * The JAAS configuration for OAuth is also needed. If using OAuth for 
inter-broker communication,
+ * the options are those specified in {@link OAuthBearerLoginCallbackHandler}. 
If <b>not</b> using
+ * OAuth for inter-broker communication, but using it for validation, a dummy 
JAAS option named
+ * <code>unsecuredLoginStringClaim_sub</code> is needed:
+ *
+ * <code>
+ * listener.name.<listener 
name>.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required \

Review comment:
       Hmm, are OAuthBearerLoginModule and unsecuredLoginStringClaim_sub needed 
on the server side in the secure mode? The latter wasn't mentioned in the KIP.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandlerTest.java
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
+import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+
+public class OAuthBearerLoginCallbackHandlerTest extends OAuthBearerTest {
+
+    @Test
+    public void testHandleTokenCallback() throws Exception {
+        Map<String, ?> configs = getSaslConfigs();
+        AccessTokenBuilder builder = new AccessTokenBuilder();
+        String accessToken = builder.build();
+        AccessTokenRetriever accessTokenRetriever = () -> accessToken;
+
+        OAuthBearerLoginCallbackHandler handler = 
createHandler(accessTokenRetriever, configs);
+
+        try {
+            OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
+            handler.handle(new Callback[] {callback});
+
+            assertNotNull(callback.token());
+            OAuthBearerToken token = callback.token();
+            assertEquals(accessToken, token.value());
+            assertEquals(builder.subject(), token.principalName());
+            assertEquals(builder.expirationSeconds() * 1000, 
token.lifetimeMs());
+            assertEquals(builder.issuedAtSeconds() * 1000, 
token.startTimeMs());
+        } finally {
+            handler.close();
+        }
+    }
+
+    @Test
+    public void testHandleSaslExtensionsCallback() throws Exception {
+        OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
+        Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://www.example.com";);
+        Map<String, Object> jaasConfig = new HashMap<>();
+        jaasConfig.put(CLIENT_ID_CONFIG, "an ID");
+        jaasConfig.put(CLIENT_SECRET_CONFIG, "a secret");
+        jaasConfig.put("extension_foo", "1");
+        jaasConfig.put("extension_bar", 2);
+        jaasConfig.put("EXTENSION_baz", "3");
+        configureHandler(handler, configs, jaasConfig);
+
+        try {
+            SaslExtensionsCallback callback = new SaslExtensionsCallback();
+            handler.handle(new Callback[]{callback});
+
+            assertNotNull(callback.extensions());
+            Map<String, String> extensions = callback.extensions().map();
+            assertEquals("1", extensions.get("foo"));
+            assertEquals("2", extensions.get("bar"));
+            assertNull(extensions.get("baz"));
+            assertEquals(2, extensions.size());
+        } finally {
+            handler.close();
+        }
+    }
+
+    @Test
+    public void testHandleSaslExtensionsCallbackWithInvalidExtension() {
+        String illegalKey = "extension_" + 
OAuthBearerClientInitialResponse.AUTH_KEY;
+
+        OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
+        Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://www.example.com";);
+        Map<String, Object> jaasConfig = new HashMap<>();
+        jaasConfig.put(CLIENT_ID_CONFIG, "an ID");
+        jaasConfig.put(CLIENT_SECRET_CONFIG, "a secret");
+        jaasConfig.put(illegalKey, "this key isn't allowed per 
OAuthBearerClientInitialResponse.validateExtensions");
+        configureHandler(handler, configs, jaasConfig);
+
+        try {
+            SaslExtensionsCallback callback = new SaslExtensionsCallback();
+            assertThrowsWithMessage(ConfigException.class,
+                () -> handler.handle(new Callback[]{callback}),
+                "Extension name " + OAuthBearerClientInitialResponse.AUTH_KEY 
+ " is invalid");
+        } finally {
+            handler.close();
+        }
+    }
+
+    @Test
+    public void testInvalidCallbackGeneratesUnsupportedCallbackException() {
+        Map<String, ?> configs = getSaslConfigs();
+        OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
+        AccessTokenRetriever accessTokenRetriever = () -> "foo";
+        AccessTokenValidator accessTokenValidator = 
AccessTokenValidatorFactory.create(configs);
+        handler.init(accessTokenRetriever, accessTokenValidator);
+
+        try {
+            Callback unsupportedCallback = new Callback() { };
+            assertThrows(UnsupportedCallbackException.class, () -> 
handler.handle(new Callback[]{unsupportedCallback}));
+        } finally {
+            handler.close();
+        }
+    }
+
+    @Test
+    public void testInvalidAccessToken() throws Exception {
+        testInvalidAccessToken("this isn't valid", "Malformed JWT provided");
+        testInvalidAccessToken("this.isn't.valid", "malformed Base64 URL 
encoded value");
+        testInvalidAccessToken(createAccessKey("this", "isn't", "valid"), 
"malformed JSON");
+        testInvalidAccessToken(createAccessKey("{}", "{}", "{}"), "exp value 
must be non-null");
+    }
+
+    @Test
+    public void testMissingAccessToken() {
+        AccessTokenRetriever accessTokenRetriever = () -> {
+            throw new IOException("The token endpoint response access_token 
value must be non-null");
+        };
+        Map<String, ?> configs = getSaslConfigs();
+        OAuthBearerLoginCallbackHandler handler = 
createHandler(accessTokenRetriever, configs);
+
+        try {
+            OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
+            assertThrowsWithMessage(IOException.class,
+                () -> handler.handle(new Callback[]{callback}),
+                "token endpoint response access_token value must be non-null");
+        } finally {
+            handler.close();
+        }
+    }
+
+    @Test
+    public void testNotConfigured() {
+        OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
+        assertThrowsWithMessage(IllegalStateException.class, () -> 
handler.handle(new Callback[] {}), "first call the configure or init method");
+    }
+
+    @Test
+    public void testConfigureWithAccessTokenFile() throws Exception {
+        String expected = "{}";
+
+        File tmpDir = createTempDir("access-token");
+        File accessTokenFile = createTempFile(tmpDir, "access-token-", 
".json", expected);
+
+        OAuthBearerLoginCallbackHandler handler = new 
OAuthBearerLoginCallbackHandler();
+        Map<String, ?> configs = 
getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, 
accessTokenFile.toURI().toString());
+        Map<String, Object> jaasConfigs = Collections.emptyMap();
+        configureHandler(handler, configs, jaasConfigs);

Review comment:
       This test doesn't seem to verify anything. Ditto for the next test.

##########
File path: clients/src/test/resources/log4j.properties
##########
@@ -12,10 +12,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-log4j.rootLogger=OFF, stdout
+log4j.rootLogger=INFO, stdout

Review comment:
       Are the changes in this file needed?

##########
File path: 
tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTest.java
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS_DOC;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS_DOC;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC;
+import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_DOC;
+import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
+import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_DOC;
+import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
+import static 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_DOC;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SaslConfigs;
+import 
org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetriever;
+import 
org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetrieverFactory;
+import 
org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidator;
+import 
org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidatorFactory;
+import 
org.apache.kafka.common.security.oauthbearer.secured.CloseableVerificationKeyResolver;
+import 
org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler;
+import 
org.apache.kafka.common.security.oauthbearer.secured.VerificationKeyResolverFactory;
+import org.apache.kafka.common.utils.Exit;
+
+public class OAuthCompatibilityTest {
+
+    public static void main(String[] args) {
+        String description = String.format(
+            "This tool is used to verify OAuth/OIDC provider 
compatibility.%n%n" +
+            "To use, first export KAFKA_OPTS with Java system properties that 
match%n" +
+            "your OAuth/OIDC configuration. Next, run the following script 
to%n" +
+            "execute the test:%n%n" +
+            "    ./bin/kafka-run-class.sh %s" +
+            "%n%n" +
+            "Please refer to the following source files for OAuth/OIDC client 
and%n" +
+            "broker configuration options:" +
+            "%n%n" +
+            "    %s%n" +
+            "    %s",
+            OAuthCompatibilityTest.class.getName(),
+            SaslConfigs.class.getName(),
+            OAuthBearerLoginCallbackHandler.class.getName());
+
+        ArgumentParser parser = ArgumentParsers
+            .newArgumentParser("oauth-compatibility-test")
+            .defaultHelp(true)
+            .description(description);
+
+        parser.addArgument("--connect-timeout-ms")
+            .type(Long.class)
+            .dest("connectTimeoutMs")
+            .help(SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC);
+        parser.addArgument("--read-timeout-ms")
+            .type(Long.class)
+            .dest("readTimeoutMs")
+            .help(SASL_LOGIN_READ_TIMEOUT_MS_DOC);
+        parser.addArgument("--retry-backoff-ms")
+            .type(Long.class)
+            .dest("retryBackoffMs")
+            .help(SASL_LOGIN_RETRY_BACKOFF_MS_DOC);
+        parser.addArgument("--retry-backoff-max-ms")
+            .type(Long.class)
+            .dest("retryBackoffMax")
+            .help(SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC);
+        parser.addArgument("--scope-claim-name")
+            .dest("scopeClaimName")
+            .help(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC);
+        parser.addArgument("--sub-claim-name")
+            .dest("subClaimName")
+            .help(SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC);
+        parser.addArgument("--token-endpoint-url")
+            .dest("tokenEndpointUrl")
+            .help(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC);
+        parser.addArgument("--jwks-endpoint-url")
+            .dest("jwksEndpointUrl")
+            .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC);
+        parser.addArgument("--jwks-endpoint-refresh-ms")
+            .type(Long.class)
+            .dest("jwksEndpointRefreshMs")
+            .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC);
+        parser.addArgument("--clock-skew-seconds")
+            .type(Integer.class)
+            .dest("clockSkewSeconds")
+            .help(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC);
+        parser.addArgument("--expected-audience")
+            .dest("expectedAudience")
+            .help(SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC);
+        parser.addArgument("--expected-issuer")
+            .dest("expectedIssuer")
+            .help(SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
+
+        parser.addArgument("--client-id")
+            .dest("clientId")
+            .help(CLIENT_ID_DOC);
+        parser.addArgument("--client-secret")
+            .dest("clientSecret")
+            .help(CLIENT_SECRET_DOC);
+        parser.addArgument("--scope")
+            .dest("scope")
+            .help(SCOPE_DOC);
+
+        Namespace namespace;
+
+        try {
+            namespace = parser.parseArgs(args);
+        } catch (ArgumentParserException e) {
+            parser.handleError(e);
+            Exit.exit(1);
+            return;
+        }
+
+        Map<String, ?> configs = getConfigs(namespace);
+        Map<String, Object> jaasConfigs = getJaasConfigs(namespace);
+
+        try {
+            String accessToken;
+
+            {
+                // Client side...
+                try (AccessTokenRetriever atr = 
AccessTokenRetrieverFactory.create(configs, jaasConfigs)) {
+                    atr.init();
+                    AccessTokenValidator atv = 
AccessTokenValidatorFactory.create(configs);
+                    System.out.println("PASSED 1/5: client configuration");
+
+                    accessToken = atr.retrieve();
+                    System.out.println("PASSED 2/5: client JWT retrieval");
+
+                    atv.validate(accessToken);
+                    System.out.println("PASSED 3/5: client JWT validation");
+                }
+            }
+
+            {
+                // Broker side...
+                try (CloseableVerificationKeyResolver vkr = 
VerificationKeyResolverFactory.create(configs, jaasConfigs)) {
+                    vkr.init();
+                    AccessTokenValidator atv = 
AccessTokenValidatorFactory.create(configs, vkr);
+                    System.out.println("PASSED 4/5: broker configuration");
+
+                    atv.validate(accessToken);
+                    System.out.println("PASSED 5/5: broker JWT validation");
+                }
+            }
+
+            System.out.println("SUCCESS");
+            Exit.exit(0);
+        } catch (Throwable t) {
+            System.out.println("FAILED:");
+            t.printStackTrace();
+
+            if (t instanceof ConfigException) {
+                System.out.printf("%n");
+                parser.printHelp();
+            }
+
+            Exit.exit(1);
+        }
+    }
+
+    private static Map<String, ?> getConfigs(Namespace namespace) {
+        Map<String, Object> c = new HashMap<>();
+        maybeAddLong(namespace, "connectTimeoutMs", c, 
SASL_LOGIN_CONNECT_TIMEOUT_MS);

Review comment:
       This and the next one seem to be of type int.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwks.java
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.jose4j.jwk.HttpsJwks;
+import org.jose4j.jwk.JsonWebKey;
+import org.jose4j.lang.JoseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link HttpsJwks} that will periodically refresh the JWKS 
cache to reduce or
+ * even prevent HTTP/HTTPS traffic in the hot path of validation. It is 
assumed that it's
+ * possible to receive a JWT that contains a <code>kid</code> that points to 
yet-unknown JWK,
+ * thus requiring a connection to the OAuth/OIDC provider to be made. 
Hopefully, in practice,
+ * keys are made available for some amount of time before they're used within 
JWTs.
+ *
+ * This instance is created and provided to the
+ * {@link org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver} that is 
used when using
+ * an HTTP-/HTTPS-based {@link 
org.jose4j.keys.resolvers.VerificationKeyResolver}, which is then
+ * provided to the {@link ValidatorAccessTokenValidator} to use in validating 
the signature of
+ * a JWT.
+ *
+ * @see org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver
+ * @see org.jose4j.keys.resolvers.VerificationKeyResolver
+ * @see ValidatorAccessTokenValidator
+ */
+
+public final class RefreshingHttpsJwks extends HttpsJwks implements Initable, 
Closeable {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RefreshingHttpsJwks.class);
+
+    private static final int MISSING_KEY_ID_CACHE_MAX_ENTRIES = 16;
+
+    private static final long MISSING_KEY_ID_CACHE_IN_FLIGHT_MS = 60000;
+
+    private static final int MISSING_KEY_ID_MAX_KEY_LENGTH = 1000;
+
+    private static final int SHUTDOWN_TIMEOUT = 10;
+
+    private static final TimeUnit SHUTDOWN_TIME_UNIT = TimeUnit.SECONDS;
+
+    private final ScheduledExecutorService executorService;
+
+    private final long refreshMs;
+
+    private final ReadWriteLock refreshLock = new ReentrantReadWriteLock();
+
+    private final Map<String, Long> missingKeyIds;
+
+    private List<JsonWebKey> jsonWebKeys;
+
+    private boolean isInitialized;
+
+    /**
+     * Creates a <code>RefreshingHttpsJwks</code> that will be used by the
+     * {@link RefreshingHttpsJwksVerificationKeyResolver} to resolve new key 
IDs in JWTs.
+     *
+     * @param location  HTTP/HTTPS endpoint from which to retrieve the JWKS 
based on
+     *                  the OAuth/OIDC standard
+     * @param refreshMs The number of milliseconds between refresh passes to 
connect
+     *                  to the OAuth/OIDC JWKS endpoint to retrieve the latest 
set
+     */
+
+    public RefreshingHttpsJwks(String location, long refreshMs) {
+        super(location);
+
+        if (refreshMs <= 0)
+            throw new IllegalArgumentException("JWKS validation key refresh 
configuration value retryWaitMs value must be positive");
+
+        setDefaultCacheDuration(refreshMs);
+
+        this.refreshMs = refreshMs;
+        this.executorService = Executors.newSingleThreadScheduledExecutor();
+        this.missingKeyIds = new LinkedHashMap<String, 
Long>(MISSING_KEY_ID_CACHE_MAX_ENTRIES, .75f, true) {
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<String, Long> 
eldest) {
+                return this.size() > MISSING_KEY_ID_CACHE_MAX_ENTRIES;
+            }
+        };
+    }
+
+    @Override
+    public void init() throws IOException {
+        try {
+            log.debug("init started");
+
+            List<JsonWebKey> localJWKs;
+
+            try {
+                localJWKs = super.getJsonWebKeys();
+            } catch (JoseException e) {
+                throw new IOException("Could not refresh JWKS", e);
+            }
+
+            try {
+                refreshLock.writeLock().lock();
+                this.jsonWebKeys = Collections.unmodifiableList(localJWKs);

Review comment:
       `this.jsonWebKeys` could just be `jsonWebKeys`?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerTest.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.function.Consumer;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.authenticator.TestJaasConfig;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestInstance.Lifecycle;
+import org.junit.jupiter.api.function.Executable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@TestInstance(Lifecycle.PER_CLASS)
+public abstract class OAuthBearerTest {
+
+    protected final Logger log = LoggerFactory.getLogger(getClass());
+
+    protected ObjectMapper mapper = new ObjectMapper();
+
+    protected void assertThrowsWithMessage(Class<? extends Exception> clazz,
+        Executable executable,
+        String substring) {
+        boolean failed = false;
+
+        try {
+            executable.execute();
+        } catch (Throwable t) {
+            failed = true;
+            assertTrue(clazz.isInstance(t), String.format("Test failed by 
exception %s, but expected %s", t.getClass(), clazz));
+
+            assertErrorMessageContains(t.getMessage(), substring);
+        }
+
+        if (!failed)
+            fail("Expected test to fail with " + clazz + " that contains the 
string " + substring);
+    }
+
+    protected void assertErrorMessageContains(String actual, String 
expectedSubstring) {
+        assertTrue(actual.contains(expectedSubstring),
+            String.format("Expected exception message (\"%s\") to contain 
substring (\"%s\")",
+                actual,
+                expectedSubstring));
+    }
+
+    protected void configureHandler(AuthenticateCallbackHandler handler,
+        Map<String, ?> configs,
+        Map<String, Object> jaasConfig) {
+        TestJaasConfig config = new TestJaasConfig();
+        config.createOrUpdateEntry("KafkaClient", 
OAuthBearerLoginModule.class.getName(), jaasConfig);
+        AppConfigurationEntry kafkaClient = 
config.getAppConfigurationEntry("KafkaClient")[0];
+
+        handler.configure(configs,
+            OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
+            Collections.singletonList(kafkaClient));
+    }
+
+    protected String createBase64JsonJwtSection(Consumer<ObjectNode> c) {
+        String json = createJsonJwtSection(c);
+
+        try {
+            return Utils.utf8(Base64.getEncoder().encode(Utils.utf8(json)));
+        } catch (Throwable t) {
+            fail(t);
+
+            // Shouldn't get to here...
+            return null;
+        }
+    }
+
+    protected String createJsonJwtSection(Consumer<ObjectNode> c) {
+        ObjectNode node = mapper.createObjectNode();
+        c.accept(node);
+
+        try {
+            return mapper.writeValueAsString(node);
+        } catch (Throwable t) {
+            fail(t);
+
+            // Shouldn't get to here...
+            return null;
+        }
+    }
+
+    protected Retryable<String> createRetryable(Exception[] attempts) {
+        Iterator<Exception> i = Arrays.asList(attempts).iterator();
+
+        return () -> {
+            Exception e = i.hasNext() ? i.next() : null;
+
+            if (e == null) {
+                return "success!";
+            } else {
+                if (e instanceof IOException)
+                    throw (IOException) e;
+                else if (e instanceof RuntimeException)
+                    throw (RuntimeException) e;
+                else
+                    throw new RuntimeException(e);
+            }
+        };
+    }
+
+    protected HttpURLConnection createHttpURLConnection(String response) 
throws IOException {
+        HttpURLConnection mockedCon = mock(HttpURLConnection.class);
+        when(mockedCon.getURL()).thenReturn(new 
URL("https://www.example.com";));
+        when(mockedCon.getResponseCode()).thenReturn(200);
+        when(mockedCon.getOutputStream()).thenReturn(new 
ByteArrayOutputStream());
+        when(mockedCon.getInputStream()).thenReturn(new 
ByteArrayInputStream(Utils.utf8(response)));
+        return mockedCon;
+    }
+
+    protected File createTempPemDir() throws IOException {

Review comment:
       This seems unused.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/LoginAccessTokenValidator.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.oauthbearer.secured;
+
+import static 
org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME;
+import static 
org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerIllegalTokenException;
+import 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * LoginAccessTokenValidator is an implementation of {@link 
AccessTokenValidator} that is used
+ * by the client to perform some rudimentary validation of the JWT access 
token that is received
+ * as part of the response from posting the client credentials to the 
OAuth/OIDC provider's
+ * token endpoint.
+ *
+ * The validation steps performed are:
+ *
+ * <ol>
+ *     <li>
+ *         Basic structural validation of the <code>b64token</code> value as 
defined in
+ *         <a href="https://tools.ietf.org/html/rfc6750#section-2.1";>RFC 6750 
Section 2.1</a>
+ *     </li>
+ *     <li>Basic conversion of the token into an in-memory map</li>
+ *     <li>Presence of scope, <code>exp</code>, subject, and <code>iat</code> 
claims</li>
+ * </ol>
+ */
+
+public class LoginAccessTokenValidator implements AccessTokenValidator {
+
+    private static final Logger log = 
LoggerFactory.getLogger(LoginAccessTokenValidator.class);
+
+    public static final String EXPIRATION_CLAIM_NAME = "exp";
+
+    public static final String ISSUED_AT_CLAIM_NAME = "iat";
+
+    private final String scopeClaimName;
+
+    private final String subClaimName;
+
+    /**
+     * Creates a new LoginAccessTokenValidator that will be used by the client 
for lightweight
+     * validation of the JWT.
+     */
+
+    public LoginAccessTokenValidator() {

Review comment:
       This seems never used?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to