junrao commented on a change in pull request #11284: URL: https://github.com/apache/kafka/pull/11284#discussion_r734831120
########## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/ConfigurationUtils.java ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Locale; +import java.util.Map; +import javax.net.ssl.SSLSocketFactory; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; +import org.apache.kafka.common.security.ssl.SslFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <code>ConfigurationUtils</code> is a utility class to perform basic configuration-related + * logic and is separated out here for easier, more direct testing. + */ + +public class ConfigurationUtils { + + private static final Logger log = LoggerFactory.getLogger(ConfigurationUtils.class); + + private final Map<String, ?> configs; + + private final String prefix; + + public ConfigurationUtils(Map<String, ?> configs) { + this(configs, null); + } + + public ConfigurationUtils(Map<String, ?> configs, String saslMechanism) { Review comment: It seems that the configs passed to the following method already has the listener prefix removed. ` void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries); ` The logic does that is in ChannelBuilders.create(), where we have the following code. ` Map<String, Object> configs = channelBuilderConfigs(config, listenerName); ` So, the code in the PR is correct. ########## File path: build.gradle ########## @@ -829,6 +829,7 @@ project(':core') { implementation libs.jacksonDataformatCsv implementation libs.jacksonJDK8Datatypes implementation libs.joptSimple + implementation libs.jose4j Review comment: This is fine. It will be useful to check all other projects that depend on clients and see if the jose4j dependency needs to be added explicitly. For example, it's convenient to add it to shell and potentially connect:runtime. ########## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerValidatorCallbackHandler.java ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerExtensionsValidatorCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> + * <code>OAuthBearerValidatorCallbackHandler</code> is an {@link AuthenticateCallbackHandler} that + * accepts {@link OAuthBearerValidatorCallback} and {@link OAuthBearerExtensionsValidatorCallback} + * callbacks to implement OAuth/OIDC validation. This callback handler is intended only to be used + * on the Kafka broker side as it will receive a {@link OAuthBearerValidatorCallback} that includes + * the JWT provided by the Kafka client. That JWT is validated in terms of format, expiration, + * signature, and audience and issuer (if desired). This callback handler is the broker side of the + * OAuth functionality, whereas {@link OAuthBearerLoginCallbackHandler} is used by clients. + * </p> + * + * <p> + * This {@link AuthenticateCallbackHandler} is enabled in the broker configuration by setting the + * {@link org.apache.kafka.common.config.internals.BrokerSecurityConfigs#SASL_SERVER_CALLBACK_HANDLER_CLASS} + * like so: + * + * <code> + * listener.name.<listener name>.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler + * </code> + * </p> + * + * <p> + * The JAAS configuration for OAuth is also needed. If using OAuth for inter-broker communication, + * the options are those specified in {@link OAuthBearerLoginCallbackHandler}. If <b>not</b> using + * OAuth for inter-broker communication, but using it for validation, a dummy JAAS option named + * <code>unsecuredLoginStringClaim_sub</code> is needed: + * + * <code> + * listener.name.<listener name>.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ Review comment: Hmm, are OAuthBearerLoginModule and unsecuredLoginStringClaim_sub needed on the server side in the secure mode? The latter wasn't mentioned in the KIP. ########## File path: clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerLoginCallbackHandlerTest.java ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL; +import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG; +import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.security.auth.SaslExtensionsCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; +import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse; +import org.apache.kafka.common.utils.Utils; +import org.junit.jupiter.api.Test; + +public class OAuthBearerLoginCallbackHandlerTest extends OAuthBearerTest { + + @Test + public void testHandleTokenCallback() throws Exception { + Map<String, ?> configs = getSaslConfigs(); + AccessTokenBuilder builder = new AccessTokenBuilder(); + String accessToken = builder.build(); + AccessTokenRetriever accessTokenRetriever = () -> accessToken; + + OAuthBearerLoginCallbackHandler handler = createHandler(accessTokenRetriever, configs); + + try { + OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback(); + handler.handle(new Callback[] {callback}); + + assertNotNull(callback.token()); + OAuthBearerToken token = callback.token(); + assertEquals(accessToken, token.value()); + assertEquals(builder.subject(), token.principalName()); + assertEquals(builder.expirationSeconds() * 1000, token.lifetimeMs()); + assertEquals(builder.issuedAtSeconds() * 1000, token.startTimeMs()); + } finally { + handler.close(); + } + } + + @Test + public void testHandleSaslExtensionsCallback() throws Exception { + OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler(); + Map<String, ?> configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://www.example.com"); + Map<String, Object> jaasConfig = new HashMap<>(); + jaasConfig.put(CLIENT_ID_CONFIG, "an ID"); + jaasConfig.put(CLIENT_SECRET_CONFIG, "a secret"); + jaasConfig.put("extension_foo", "1"); + jaasConfig.put("extension_bar", 2); + jaasConfig.put("EXTENSION_baz", "3"); + configureHandler(handler, configs, jaasConfig); + + try { + SaslExtensionsCallback callback = new SaslExtensionsCallback(); + handler.handle(new Callback[]{callback}); + + assertNotNull(callback.extensions()); + Map<String, String> extensions = callback.extensions().map(); + assertEquals("1", extensions.get("foo")); + assertEquals("2", extensions.get("bar")); + assertNull(extensions.get("baz")); + assertEquals(2, extensions.size()); + } finally { + handler.close(); + } + } + + @Test + public void testHandleSaslExtensionsCallbackWithInvalidExtension() { + String illegalKey = "extension_" + OAuthBearerClientInitialResponse.AUTH_KEY; + + OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler(); + Map<String, ?> configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://www.example.com"); + Map<String, Object> jaasConfig = new HashMap<>(); + jaasConfig.put(CLIENT_ID_CONFIG, "an ID"); + jaasConfig.put(CLIENT_SECRET_CONFIG, "a secret"); + jaasConfig.put(illegalKey, "this key isn't allowed per OAuthBearerClientInitialResponse.validateExtensions"); + configureHandler(handler, configs, jaasConfig); + + try { + SaslExtensionsCallback callback = new SaslExtensionsCallback(); + assertThrowsWithMessage(ConfigException.class, + () -> handler.handle(new Callback[]{callback}), + "Extension name " + OAuthBearerClientInitialResponse.AUTH_KEY + " is invalid"); + } finally { + handler.close(); + } + } + + @Test + public void testInvalidCallbackGeneratesUnsupportedCallbackException() { + Map<String, ?> configs = getSaslConfigs(); + OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler(); + AccessTokenRetriever accessTokenRetriever = () -> "foo"; + AccessTokenValidator accessTokenValidator = AccessTokenValidatorFactory.create(configs); + handler.init(accessTokenRetriever, accessTokenValidator); + + try { + Callback unsupportedCallback = new Callback() { }; + assertThrows(UnsupportedCallbackException.class, () -> handler.handle(new Callback[]{unsupportedCallback})); + } finally { + handler.close(); + } + } + + @Test + public void testInvalidAccessToken() throws Exception { + testInvalidAccessToken("this isn't valid", "Malformed JWT provided"); + testInvalidAccessToken("this.isn't.valid", "malformed Base64 URL encoded value"); + testInvalidAccessToken(createAccessKey("this", "isn't", "valid"), "malformed JSON"); + testInvalidAccessToken(createAccessKey("{}", "{}", "{}"), "exp value must be non-null"); + } + + @Test + public void testMissingAccessToken() { + AccessTokenRetriever accessTokenRetriever = () -> { + throw new IOException("The token endpoint response access_token value must be non-null"); + }; + Map<String, ?> configs = getSaslConfigs(); + OAuthBearerLoginCallbackHandler handler = createHandler(accessTokenRetriever, configs); + + try { + OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback(); + assertThrowsWithMessage(IOException.class, + () -> handler.handle(new Callback[]{callback}), + "token endpoint response access_token value must be non-null"); + } finally { + handler.close(); + } + } + + @Test + public void testNotConfigured() { + OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler(); + assertThrowsWithMessage(IllegalStateException.class, () -> handler.handle(new Callback[] {}), "first call the configure or init method"); + } + + @Test + public void testConfigureWithAccessTokenFile() throws Exception { + String expected = "{}"; + + File tmpDir = createTempDir("access-token"); + File accessTokenFile = createTempFile(tmpDir, "access-token-", ".json", expected); + + OAuthBearerLoginCallbackHandler handler = new OAuthBearerLoginCallbackHandler(); + Map<String, ?> configs = getSaslConfigs(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, accessTokenFile.toURI().toString()); + Map<String, Object> jaasConfigs = Collections.emptyMap(); + configureHandler(handler, configs, jaasConfigs); Review comment: This test doesn't seem to verify anything. Ditto for the next test. ########## File path: clients/src/test/resources/log4j.properties ########## @@ -12,10 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=OFF, stdout +log4j.rootLogger=INFO, stdout Review comment: Are the changes in this file needed? ########## File path: tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTest.java ########## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS_DOC; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS_DOC; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC; +import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG; +import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_DOC; +import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG; +import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_DOC; +import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG; +import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_DOC; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetriever; +import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetrieverFactory; +import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidator; +import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidatorFactory; +import org.apache.kafka.common.security.oauthbearer.secured.CloseableVerificationKeyResolver; +import org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.secured.VerificationKeyResolverFactory; +import org.apache.kafka.common.utils.Exit; + +public class OAuthCompatibilityTest { + + public static void main(String[] args) { + String description = String.format( + "This tool is used to verify OAuth/OIDC provider compatibility.%n%n" + + "To use, first export KAFKA_OPTS with Java system properties that match%n" + + "your OAuth/OIDC configuration. Next, run the following script to%n" + + "execute the test:%n%n" + + " ./bin/kafka-run-class.sh %s" + + "%n%n" + + "Please refer to the following source files for OAuth/OIDC client and%n" + + "broker configuration options:" + + "%n%n" + + " %s%n" + + " %s", + OAuthCompatibilityTest.class.getName(), + SaslConfigs.class.getName(), + OAuthBearerLoginCallbackHandler.class.getName()); + + ArgumentParser parser = ArgumentParsers + .newArgumentParser("oauth-compatibility-test") + .defaultHelp(true) + .description(description); + + parser.addArgument("--connect-timeout-ms") + .type(Long.class) + .dest("connectTimeoutMs") + .help(SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC); + parser.addArgument("--read-timeout-ms") + .type(Long.class) + .dest("readTimeoutMs") + .help(SASL_LOGIN_READ_TIMEOUT_MS_DOC); + parser.addArgument("--retry-backoff-ms") + .type(Long.class) + .dest("retryBackoffMs") + .help(SASL_LOGIN_RETRY_BACKOFF_MS_DOC); + parser.addArgument("--retry-backoff-max-ms") + .type(Long.class) + .dest("retryBackoffMax") + .help(SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC); + parser.addArgument("--scope-claim-name") + .dest("scopeClaimName") + .help(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC); + parser.addArgument("--sub-claim-name") + .dest("subClaimName") + .help(SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC); + parser.addArgument("--token-endpoint-url") + .dest("tokenEndpointUrl") + .help(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC); + parser.addArgument("--jwks-endpoint-url") + .dest("jwksEndpointUrl") + .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC); + parser.addArgument("--jwks-endpoint-refresh-ms") + .type(Long.class) + .dest("jwksEndpointRefreshMs") + .help(SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC); + parser.addArgument("--clock-skew-seconds") + .type(Integer.class) + .dest("clockSkewSeconds") + .help(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC); + parser.addArgument("--expected-audience") + .dest("expectedAudience") + .help(SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC); + parser.addArgument("--expected-issuer") + .dest("expectedIssuer") + .help(SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC); + + parser.addArgument("--client-id") + .dest("clientId") + .help(CLIENT_ID_DOC); + parser.addArgument("--client-secret") + .dest("clientSecret") + .help(CLIENT_SECRET_DOC); + parser.addArgument("--scope") + .dest("scope") + .help(SCOPE_DOC); + + Namespace namespace; + + try { + namespace = parser.parseArgs(args); + } catch (ArgumentParserException e) { + parser.handleError(e); + Exit.exit(1); + return; + } + + Map<String, ?> configs = getConfigs(namespace); + Map<String, Object> jaasConfigs = getJaasConfigs(namespace); + + try { + String accessToken; + + { + // Client side... + try (AccessTokenRetriever atr = AccessTokenRetrieverFactory.create(configs, jaasConfigs)) { + atr.init(); + AccessTokenValidator atv = AccessTokenValidatorFactory.create(configs); + System.out.println("PASSED 1/5: client configuration"); + + accessToken = atr.retrieve(); + System.out.println("PASSED 2/5: client JWT retrieval"); + + atv.validate(accessToken); + System.out.println("PASSED 3/5: client JWT validation"); + } + } + + { + // Broker side... + try (CloseableVerificationKeyResolver vkr = VerificationKeyResolverFactory.create(configs, jaasConfigs)) { + vkr.init(); + AccessTokenValidator atv = AccessTokenValidatorFactory.create(configs, vkr); + System.out.println("PASSED 4/5: broker configuration"); + + atv.validate(accessToken); + System.out.println("PASSED 5/5: broker JWT validation"); + } + } + + System.out.println("SUCCESS"); + Exit.exit(0); + } catch (Throwable t) { + System.out.println("FAILED:"); + t.printStackTrace(); + + if (t instanceof ConfigException) { + System.out.printf("%n"); + parser.printHelp(); + } + + Exit.exit(1); + } + } + + private static Map<String, ?> getConfigs(Namespace namespace) { + Map<String, Object> c = new HashMap<>(); + maybeAddLong(namespace, "connectTimeoutMs", c, SASL_LOGIN_CONNECT_TIMEOUT_MS); Review comment: This and the next one seem to be of type int. ########## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/RefreshingHttpsJwks.java ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.jose4j.jwk.HttpsJwks; +import org.jose4j.jwk.JsonWebKey; +import org.jose4j.lang.JoseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link HttpsJwks} that will periodically refresh the JWKS cache to reduce or + * even prevent HTTP/HTTPS traffic in the hot path of validation. It is assumed that it's + * possible to receive a JWT that contains a <code>kid</code> that points to yet-unknown JWK, + * thus requiring a connection to the OAuth/OIDC provider to be made. Hopefully, in practice, + * keys are made available for some amount of time before they're used within JWTs. + * + * This instance is created and provided to the + * {@link org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver} that is used when using + * an HTTP-/HTTPS-based {@link org.jose4j.keys.resolvers.VerificationKeyResolver}, which is then + * provided to the {@link ValidatorAccessTokenValidator} to use in validating the signature of + * a JWT. + * + * @see org.jose4j.keys.resolvers.HttpsJwksVerificationKeyResolver + * @see org.jose4j.keys.resolvers.VerificationKeyResolver + * @see ValidatorAccessTokenValidator + */ + +public final class RefreshingHttpsJwks extends HttpsJwks implements Initable, Closeable { + + private static final Logger log = LoggerFactory.getLogger(RefreshingHttpsJwks.class); + + private static final int MISSING_KEY_ID_CACHE_MAX_ENTRIES = 16; + + private static final long MISSING_KEY_ID_CACHE_IN_FLIGHT_MS = 60000; + + private static final int MISSING_KEY_ID_MAX_KEY_LENGTH = 1000; + + private static final int SHUTDOWN_TIMEOUT = 10; + + private static final TimeUnit SHUTDOWN_TIME_UNIT = TimeUnit.SECONDS; + + private final ScheduledExecutorService executorService; + + private final long refreshMs; + + private final ReadWriteLock refreshLock = new ReentrantReadWriteLock(); + + private final Map<String, Long> missingKeyIds; + + private List<JsonWebKey> jsonWebKeys; + + private boolean isInitialized; + + /** + * Creates a <code>RefreshingHttpsJwks</code> that will be used by the + * {@link RefreshingHttpsJwksVerificationKeyResolver} to resolve new key IDs in JWTs. + * + * @param location HTTP/HTTPS endpoint from which to retrieve the JWKS based on + * the OAuth/OIDC standard + * @param refreshMs The number of milliseconds between refresh passes to connect + * to the OAuth/OIDC JWKS endpoint to retrieve the latest set + */ + + public RefreshingHttpsJwks(String location, long refreshMs) { + super(location); + + if (refreshMs <= 0) + throw new IllegalArgumentException("JWKS validation key refresh configuration value retryWaitMs value must be positive"); + + setDefaultCacheDuration(refreshMs); + + this.refreshMs = refreshMs; + this.executorService = Executors.newSingleThreadScheduledExecutor(); + this.missingKeyIds = new LinkedHashMap<String, Long>(MISSING_KEY_ID_CACHE_MAX_ENTRIES, .75f, true) { + @Override + protected boolean removeEldestEntry(Map.Entry<String, Long> eldest) { + return this.size() > MISSING_KEY_ID_CACHE_MAX_ENTRIES; + } + }; + } + + @Override + public void init() throws IOException { + try { + log.debug("init started"); + + List<JsonWebKey> localJWKs; + + try { + localJWKs = super.getJsonWebKeys(); + } catch (JoseException e) { + throw new IOException("Could not refresh JWKS", e); + } + + try { + refreshLock.writeLock().lock(); + this.jsonWebKeys = Collections.unmodifiableList(localJWKs); Review comment: `this.jsonWebKeys` could just be `jsonWebKeys`? ########## File path: clients/src/test/java/org/apache/kafka/common/security/oauthbearer/secured/OAuthBearerTest.java ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import java.util.function.Consumer; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.authenticator.TestJaasConfig; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.utils.Utils; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; +import org.junit.jupiter.api.function.Executable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@TestInstance(Lifecycle.PER_CLASS) +public abstract class OAuthBearerTest { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + protected ObjectMapper mapper = new ObjectMapper(); + + protected void assertThrowsWithMessage(Class<? extends Exception> clazz, + Executable executable, + String substring) { + boolean failed = false; + + try { + executable.execute(); + } catch (Throwable t) { + failed = true; + assertTrue(clazz.isInstance(t), String.format("Test failed by exception %s, but expected %s", t.getClass(), clazz)); + + assertErrorMessageContains(t.getMessage(), substring); + } + + if (!failed) + fail("Expected test to fail with " + clazz + " that contains the string " + substring); + } + + protected void assertErrorMessageContains(String actual, String expectedSubstring) { + assertTrue(actual.contains(expectedSubstring), + String.format("Expected exception message (\"%s\") to contain substring (\"%s\")", + actual, + expectedSubstring)); + } + + protected void configureHandler(AuthenticateCallbackHandler handler, + Map<String, ?> configs, + Map<String, Object> jaasConfig) { + TestJaasConfig config = new TestJaasConfig(); + config.createOrUpdateEntry("KafkaClient", OAuthBearerLoginModule.class.getName(), jaasConfig); + AppConfigurationEntry kafkaClient = config.getAppConfigurationEntry("KafkaClient")[0]; + + handler.configure(configs, + OAuthBearerLoginModule.OAUTHBEARER_MECHANISM, + Collections.singletonList(kafkaClient)); + } + + protected String createBase64JsonJwtSection(Consumer<ObjectNode> c) { + String json = createJsonJwtSection(c); + + try { + return Utils.utf8(Base64.getEncoder().encode(Utils.utf8(json))); + } catch (Throwable t) { + fail(t); + + // Shouldn't get to here... + return null; + } + } + + protected String createJsonJwtSection(Consumer<ObjectNode> c) { + ObjectNode node = mapper.createObjectNode(); + c.accept(node); + + try { + return mapper.writeValueAsString(node); + } catch (Throwable t) { + fail(t); + + // Shouldn't get to here... + return null; + } + } + + protected Retryable<String> createRetryable(Exception[] attempts) { + Iterator<Exception> i = Arrays.asList(attempts).iterator(); + + return () -> { + Exception e = i.hasNext() ? i.next() : null; + + if (e == null) { + return "success!"; + } else { + if (e instanceof IOException) + throw (IOException) e; + else if (e instanceof RuntimeException) + throw (RuntimeException) e; + else + throw new RuntimeException(e); + } + }; + } + + protected HttpURLConnection createHttpURLConnection(String response) throws IOException { + HttpURLConnection mockedCon = mock(HttpURLConnection.class); + when(mockedCon.getURL()).thenReturn(new URL("https://www.example.com")); + when(mockedCon.getResponseCode()).thenReturn(200); + when(mockedCon.getOutputStream()).thenReturn(new ByteArrayOutputStream()); + when(mockedCon.getInputStream()).thenReturn(new ByteArrayInputStream(Utils.utf8(response))); + return mockedCon; + } + + protected File createTempPemDir() throws IOException { Review comment: This seems unused. ########## File path: clients/src/main/java/org/apache/kafka/common/security/oauthbearer/secured/LoginAccessTokenValidator.java ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer.secured; + +import static org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME; +import static org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerIllegalTokenException; +import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * LoginAccessTokenValidator is an implementation of {@link AccessTokenValidator} that is used + * by the client to perform some rudimentary validation of the JWT access token that is received + * as part of the response from posting the client credentials to the OAuth/OIDC provider's + * token endpoint. + * + * The validation steps performed are: + * + * <ol> + * <li> + * Basic structural validation of the <code>b64token</code> value as defined in + * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section 2.1</a> + * </li> + * <li>Basic conversion of the token into an in-memory map</li> + * <li>Presence of scope, <code>exp</code>, subject, and <code>iat</code> claims</li> + * </ol> + */ + +public class LoginAccessTokenValidator implements AccessTokenValidator { + + private static final Logger log = LoggerFactory.getLogger(LoginAccessTokenValidator.class); + + public static final String EXPIRATION_CLAIM_NAME = "exp"; + + public static final String ISSUED_AT_CLAIM_NAME = "iat"; + + private final String scopeClaimName; + + private final String subClaimName; + + /** + * Creates a new LoginAccessTokenValidator that will be used by the client for lightweight + * validation of the JWT. + */ + + public LoginAccessTokenValidator() { Review comment: This seems never used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org