lianetm commented on code in PR #19754: URL: https://github.com/apache/kafka/pull/19754#discussion_r2114553642
########## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ########## @@ -129,6 +130,186 @@ public class SaslConfigs { + " authentication provider." + LOGIN_EXPONENTIAL_BACKOFF_NOTE; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "sasl.oauthbearer.jwt.retriever.class"; + public static final String DEFAULT_SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever"; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtRetriever</code> implementation used to" + + " request tokens from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.FileJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "sasl.oauthbearer.jwt.validator.class"; + public static final String DEFAULT_BROKER_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator"; + public static final String DEFAULT_CLIENT_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.ClientJwtValidator"; + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtValidator</code> implementation used to" + + " validate the JWT from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtValidator</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_GRANT_TYPE = "sasl.oauthbearer.grant.type"; + public static final String DEFAULT_SASL_OAUTHBEARER_GRANT_TYPE = "client_credentials"; + public static final String SASL_OAUTHBEARER_GRANT_TYPE_DOC = "The OAuth grant type to use when communicating with the identity provider. On the whole, the" + + " OAuth layer does not rely on this value and expects it to be used and/or verified for correctness by the <code>JwtRetriever</code> implementation." + + " The default value of <code>client_credentials</code> maintains backward compatibility. The built-in grant types are:" + + "<ul>" + + "<li><code>client_credentials</code></li>" + + "<li><code>urn:ietf:params:oauth:grant-type:jwt-bearer</code></li>" + + "</ul>" + + "<p>The OAuth code in Apache Kafka does not limit the values that are used. A user can write a custom <code>JwtRetriever</code> implementation that uses" + + " a completely different grant type, if desired.</p>"; + + public static final String SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope"; + public static final String SASL_OAUTHBEARER_SCOPE_DOC = "<p>This is the level of access a client application is granted to a resource or API which is" + + " included in the token request. If provided, it should match one or more scopes configured in the identity provider.</p>" + + "<p>" + + "The scope was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>scope</code>." + + " For backward compatibility, the <code>scope</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." Review Comment: `sasl.jaas.config` is not really marked as deprecated yet, right? Do we plan to do it as follow-up? ########## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ########## @@ -129,6 +130,186 @@ public class SaslConfigs { + " authentication provider." + LOGIN_EXPONENTIAL_BACKOFF_NOTE; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "sasl.oauthbearer.jwt.retriever.class"; + public static final String DEFAULT_SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever"; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtRetriever</code> implementation used to" + + " request tokens from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.FileJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "sasl.oauthbearer.jwt.validator.class"; + public static final String DEFAULT_BROKER_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator"; + public static final String DEFAULT_CLIENT_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.ClientJwtValidator"; + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtValidator</code> implementation used to" + + " validate the JWT from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" Review Comment: do we want to refer to the "default validator" here instead of "default configuration value"?? (referring to the "default configuration value" is confusing, because there is really no config for the `DefaultJwtValidator` right? (only for DEFAULT_BROKER.. and DEFAULT_CLIENT...) ########## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ########## @@ -129,6 +130,186 @@ public class SaslConfigs { + " authentication provider." + LOGIN_EXPONENTIAL_BACKOFF_NOTE; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "sasl.oauthbearer.jwt.retriever.class"; + public static final String DEFAULT_SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever"; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtRetriever</code> implementation used to" + + " request tokens from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.FileJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "sasl.oauthbearer.jwt.validator.class"; + public static final String DEFAULT_BROKER_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator"; + public static final String DEFAULT_CLIENT_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.ClientJwtValidator"; + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtValidator</code> implementation used to" + + " validate the JWT from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtValidator</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_GRANT_TYPE = "sasl.oauthbearer.grant.type"; + public static final String DEFAULT_SASL_OAUTHBEARER_GRANT_TYPE = "client_credentials"; + public static final String SASL_OAUTHBEARER_GRANT_TYPE_DOC = "The OAuth grant type to use when communicating with the identity provider. On the whole, the" + + " OAuth layer does not rely on this value and expects it to be used and/or verified for correctness by the <code>JwtRetriever</code> implementation." + + " The default value of <code>client_credentials</code> maintains backward compatibility. The built-in grant types are:" + + "<ul>" + + "<li><code>client_credentials</code></li>" + + "<li><code>urn:ietf:params:oauth:grant-type:jwt-bearer</code></li>" + + "</ul>" + + "<p>The OAuth code in Apache Kafka does not limit the values that are used. A user can write a custom <code>JwtRetriever</code> implementation that uses" + + " a completely different grant type, if desired.</p>"; + + public static final String SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope"; + public static final String SASL_OAUTHBEARER_SCOPE_DOC = "<p>This is the level of access a client application is granted to a resource or API which is" + + " included in the token request. If provided, it should match one or more scopes configured in the identity provider.</p>" + + "<p>" + + "The scope was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>scope</code>." + + " For backward compatibility, the <code>scope</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." + + "</p>" + + "<p>Order of precedence:</p>" + + "<ul>" + + "<li><code>sasl.oauthbearer.scope</code> from configuration</li>" + + "<li><code>scope</code> from JAAS</li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID = "sasl.oauthbearer.client.credentials.client.id"; + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID_DOC = "<p>The ID (defined in/by the OAuth identity provider) to identify the client" + + " requesting the token.</p>" + + "<p>" + + "The client ID was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>clientId</code>." + + " For backward compatibility, the <code>clientId</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." + + "</p>" + + "<p>Order of precedence:</p>" + + "<ul>" + + "<li><code>sasl.oauthbearer.client.credentials.client.id</code> from configuration</li>" + + "<li><code>clientId</code> from JAAS</li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET = "sasl.oauthbearer.client.credentials.client.secret"; + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET_DOC = "<p>The secret (defined by either the user or preassigned, depending on the" + + " identity provider) of the client requesting the token.</p>" + + "<p>" + + "The client secret was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>clientSecret</code>." + + " For backward compatibility, the <code>clientSecret</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." + + "</p>" + + "<p>Order of precedence:</p>" + + "<ul>" + + "<li><code>sasl.oauthbearer.client.credentials.client.secret</code> from configuration</li>" + + "<li><code>clientSecret</code> from JAAS</li>" + + "</ul>"; + + private static final String ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE = "<p><em>Note</em>: If a value for <code>sasl.oauthbearer.assertion.file</code> is provided," + + " this configuration will be ignored.</p>"; + + public static final String SASL_OAUTHBEARER_ASSERTION_ALGORITHM = "sasl.oauthbearer.assertion.algorithm"; + public static final String DEFAULT_SASL_OAUTHBEARER_ASSERTION_ALGORITHM = "RS256"; + public static final String SASL_OAUTHBEARER_ASSERTION_ALGORITHM_DOC = "<p>The algorithm the Apache Kafka client should use to sign the assertion sent" + + " to the identity provider. It is also used as the value of the OAuth <code>alg</code> (Algorithm) header in the JWT assertion.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD = "sasl.oauthbearer.assertion.claim.aud"; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD_DOC = "<p>The JWT <code>aud</code> (Audience) claim which will be included in the " + + " client JWT assertion created locally.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_EXP_SECONDS = "sasl.oauthbearer.assertion.claim.exp.seconds"; + public static final int DEFAULT_SASL_OAUTHBEARER_ASSERTION_CLAIM_EXP_SECONDS = 300; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_EXP_SECONDS_DOC = "<p>The number of seconds <em>in the future</em> for which the JWT is valid." + + " The value is used to determine the JWT <code>exp</code> (Expiration) claim based on the current system time when the JWT is created.</p>" + + "<p>The formula to generate the <code>exp</code> claim is very simple:</p>" + + "<pre>" + + "Let:\n\n" + + " x = the current timestamp in seconds, on client\n" + + " y = the value of this configuration\n" + + "\n" + + "Then:\n\n" + + " exp = x + y\n" + + "</pre>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_ISS = "sasl.oauthbearer.assertion.claim.iss"; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_ISS_DOC = "<p>The value to be used as the <code>iss</code> (Issuer) claim which will be included in the" + + " client JWT assertion created locally.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_JTI_INCLUDE = "sasl.oauthbearer.assertion.claim.jti.include"; + public static final boolean DEFAULT_SASL_OAUTHBEARER_ASSERTION_CLAIM_JTI_INCLUDE = false; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_JTI_INCLUDE_DOC = "<p>Flag that determines if the JWT assertion should generate a unique ID for the" + + " JWT and include it in the <code>jti</code> (JWT ID) claim.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_NBF_SECONDS = "sasl.oauthbearer.assertion.claim.nbf.seconds"; + public static final int DEFAULT_SASL_OAUTHBEARER_ASSERTION_CLAIM_NBF_SECONDS = 60; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_NBF_SECONDS_DOC = "<p>The number of seconds <em>in the past</em> from which the JWT is valid." + + " The value is used to determine the JWT <code>nbf</code> (Not Before) claim based on the current system time when the JWT is created.</p>" + + "<p>The formula to generate the <code>nbf</code> claim is very simple:</p>" + + "<pre>" + + "Let:\n\n" + + " x = the current timestamp in seconds, on client\n" + + " y = the value of this configuration\n" + + "\n" + + "Then:\n\n" + + " nbf = x - y\n" + + "</pre>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB = "sasl.oauthbearer.assertion.claim.sub"; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB_DOC = "<p>The value to be used as the <code>sub</code> (Subject) claim which will be included in the" + + " client JWT assertion created locally.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_FILE = "sasl.oauthbearer.assertion.file"; + public static final String SASL_OAUTHBEARER_ASSERTION_FILE_DOC = "<p>File that contains a <em>pre-generated</em> JWT assertion.</p>" + + "<p>The underlying implementation caches the file contents to avoid the performance hit of loading the file on each access. The caching mechanism will detect when" + + "the file changes to allow for the file to be reloaded on modifications. This allows for "live" assertion rotation without restarting the Kafka client.</p>" + + "<p>The file is the assertion in the serialized, three part JWT format:</p>" Review Comment: should it be? ```suggestion + "<p>The file contains the assertion in the serialized, three part JWT format:</p>" ``` ########## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ########## @@ -129,6 +130,186 @@ public class SaslConfigs { + " authentication provider." + LOGIN_EXPONENTIAL_BACKOFF_NOTE; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "sasl.oauthbearer.jwt.retriever.class"; + public static final String DEFAULT_SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever"; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtRetriever</code> implementation used to" + + " request tokens from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" Review Comment: Doesn't look like we use tags in config docs like these elsewhere (I may be missing something), and it surely shows polluted with the literal tags when looking at the java doc for this var usages. Is this intentional? ########## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ########## @@ -129,6 +130,186 @@ public class SaslConfigs { + " authentication provider." + LOGIN_EXPONENTIAL_BACKOFF_NOTE; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "sasl.oauthbearer.jwt.retriever.class"; + public static final String DEFAULT_SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever"; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtRetriever</code> implementation used to" + + " request tokens from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.FileJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "sasl.oauthbearer.jwt.validator.class"; + public static final String DEFAULT_BROKER_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator"; + public static final String DEFAULT_CLIENT_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.ClientJwtValidator"; + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtValidator</code> implementation used to" + + " validate the JWT from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtValidator</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_GRANT_TYPE = "sasl.oauthbearer.grant.type"; + public static final String DEFAULT_SASL_OAUTHBEARER_GRANT_TYPE = "client_credentials"; + public static final String SASL_OAUTHBEARER_GRANT_TYPE_DOC = "The OAuth grant type to use when communicating with the identity provider. On the whole, the" + + " OAuth layer does not rely on this value and expects it to be used and/or verified for correctness by the <code>JwtRetriever</code> implementation." + + " The default value of <code>client_credentials</code> maintains backward compatibility. The built-in grant types are:" + + "<ul>" + + "<li><code>client_credentials</code></li>" + + "<li><code>urn:ietf:params:oauth:grant-type:jwt-bearer</code></li>" + + "</ul>" + + "<p>The OAuth code in Apache Kafka does not limit the values that are used. A user can write a custom <code>JwtRetriever</code> implementation that uses" + + " a completely different grant type, if desired.</p>"; + + public static final String SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope"; + public static final String SASL_OAUTHBEARER_SCOPE_DOC = "<p>This is the level of access a client application is granted to a resource or API which is" + + " included in the token request. If provided, it should match one or more scopes configured in the identity provider.</p>" + + "<p>" + + "The scope was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>scope</code>." + + " For backward compatibility, the <code>scope</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." + + "</p>" + + "<p>Order of precedence:</p>" + + "<ul>" + + "<li><code>sasl.oauthbearer.scope</code> from configuration</li>" + + "<li><code>scope</code> from JAAS</li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID = "sasl.oauthbearer.client.credentials.client.id"; + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID_DOC = "<p>The ID (defined in/by the OAuth identity provider) to identify the client" + + " requesting the token.</p>" + + "<p>" + + "The client ID was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>clientId</code>." + + " For backward compatibility, the <code>clientId</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." + + "</p>" + + "<p>Order of precedence:</p>" + + "<ul>" + + "<li><code>sasl.oauthbearer.client.credentials.client.id</code> from configuration</li>" + + "<li><code>clientId</code> from JAAS</li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET = "sasl.oauthbearer.client.credentials.client.secret"; + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET_DOC = "<p>The secret (defined by either the user or preassigned, depending on the" + + " identity provider) of the client requesting the token.</p>" + + "<p>" + + "The client secret was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>clientSecret</code>." + + " For backward compatibility, the <code>clientSecret</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." + + "</p>" + + "<p>Order of precedence:</p>" + + "<ul>" + + "<li><code>sasl.oauthbearer.client.credentials.client.secret</code> from configuration</li>" + + "<li><code>clientSecret</code> from JAAS</li>" + + "</ul>"; + + private static final String ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE = "<p><em>Note</em>: If a value for <code>sasl.oauthbearer.assertion.file</code> is provided," + + " this configuration will be ignored.</p>"; + + public static final String SASL_OAUTHBEARER_ASSERTION_ALGORITHM = "sasl.oauthbearer.assertion.algorithm"; + public static final String DEFAULT_SASL_OAUTHBEARER_ASSERTION_ALGORITHM = "RS256"; + public static final String SASL_OAUTHBEARER_ASSERTION_ALGORITHM_DOC = "<p>The algorithm the Apache Kafka client should use to sign the assertion sent" + + " to the identity provider. It is also used as the value of the OAuth <code>alg</code> (Algorithm) header in the JWT assertion.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD = "sasl.oauthbearer.assertion.claim.aud"; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD_DOC = "<p>The JWT <code>aud</code> (Audience) claim which will be included in the " + + " client JWT assertion created locally.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_EXP_SECONDS = "sasl.oauthbearer.assertion.claim.exp.seconds"; + public static final int DEFAULT_SASL_OAUTHBEARER_ASSERTION_CLAIM_EXP_SECONDS = 300; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_EXP_SECONDS_DOC = "<p>The number of seconds <em>in the future</em> for which the JWT is valid." + + " The value is used to determine the JWT <code>exp</code> (Expiration) claim based on the current system time when the JWT is created.</p>" + + "<p>The formula to generate the <code>exp</code> claim is very simple:</p>" + + "<pre>" + + "Let:\n\n" + + " x = the current timestamp in seconds, on client\n" + + " y = the value of this configuration\n" + + "\n" + + "Then:\n\n" + + " exp = x + y\n" + + "</pre>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_ISS = "sasl.oauthbearer.assertion.claim.iss"; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_ISS_DOC = "<p>The value to be used as the <code>iss</code> (Issuer) claim which will be included in the" + + " client JWT assertion created locally.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_JTI_INCLUDE = "sasl.oauthbearer.assertion.claim.jti.include"; + public static final boolean DEFAULT_SASL_OAUTHBEARER_ASSERTION_CLAIM_JTI_INCLUDE = false; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_JTI_INCLUDE_DOC = "<p>Flag that determines if the JWT assertion should generate a unique ID for the" + + " JWT and include it in the <code>jti</code> (JWT ID) claim.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_NBF_SECONDS = "sasl.oauthbearer.assertion.claim.nbf.seconds"; + public static final int DEFAULT_SASL_OAUTHBEARER_ASSERTION_CLAIM_NBF_SECONDS = 60; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_NBF_SECONDS_DOC = "<p>The number of seconds <em>in the past</em> from which the JWT is valid." + + " The value is used to determine the JWT <code>nbf</code> (Not Before) claim based on the current system time when the JWT is created.</p>" + + "<p>The formula to generate the <code>nbf</code> claim is very simple:</p>" + + "<pre>" + + "Let:\n\n" + + " x = the current timestamp in seconds, on client\n" + + " y = the value of this configuration\n" + + "\n" + + "Then:\n\n" + + " nbf = x - y\n" + + "</pre>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB = "sasl.oauthbearer.assertion.claim.sub"; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB_DOC = "<p>The value to be used as the <code>sub</code> (Subject) claim which will be included in the" + + " client JWT assertion created locally.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_FILE = "sasl.oauthbearer.assertion.file"; + public static final String SASL_OAUTHBEARER_ASSERTION_FILE_DOC = "<p>File that contains a <em>pre-generated</em> JWT assertion.</p>" + + "<p>The underlying implementation caches the file contents to avoid the performance hit of loading the file on each access. The caching mechanism will detect when" + + "the file changes to allow for the file to be reloaded on modifications. This allows for "live" assertion rotation without restarting the Kafka client.</p>" + + "<p>The file is the assertion in the serialized, three part JWT format:</p>" + + "<ol>" + + "<li>The <em>header</em> section is a base 64-encoded JWT header that contains values like <code>alg</code> (Algorithm)," + + " <code>typ</code> (Type, always the literal value <code>JWT</code>), etc.</li>" + + "<li>The <em>payload</em> section includes the base 64-encoded set of JWT claims, such as <code>aud</code> (Audience), <code>iss</code> (Issuer)," + + " <code>sub</code> (Subject), etc.</li>" + + "<li>The <em>signature</em> section is the concatenated <em>header</em> and <em>payload</em> sections that was signed using a private key</li>" + + "</ol>" + + "<p>See <a href=\"https://datatracker.ietf.org/doc/html/rfc7519\">RFC 7519</a> and <a href=\"https://datatracker.ietf.org/doc/html/rfc7515\">RFC 7515</a>" + + " for more details on the JWT and JWS formats.</p>" + + "<p><em>Note</em>: If a value for <code>sasl.oauthbearer.assertion.file</code> is provided, all other" + + " <code>sasl.oauthbearer.assertion.</code>* configuration are ignored.</p>"; Review Comment: configurations are ########## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ########## @@ -129,6 +130,186 @@ public class SaslConfigs { + " authentication provider." + LOGIN_EXPONENTIAL_BACKOFF_NOTE; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "sasl.oauthbearer.jwt.retriever.class"; + public static final String DEFAULT_SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever"; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtRetriever</code> implementation used to" + + " request tokens from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.FileJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "sasl.oauthbearer.jwt.validator.class"; + public static final String DEFAULT_BROKER_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator"; + public static final String DEFAULT_CLIENT_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.ClientJwtValidator"; + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtValidator</code> implementation used to" + + " validate the JWT from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtValidator</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_GRANT_TYPE = "sasl.oauthbearer.grant.type"; + public static final String DEFAULT_SASL_OAUTHBEARER_GRANT_TYPE = "client_credentials"; + public static final String SASL_OAUTHBEARER_GRANT_TYPE_DOC = "The OAuth grant type to use when communicating with the identity provider. On the whole, the" + + " OAuth layer does not rely on this value and expects it to be used and/or verified for correctness by the <code>JwtRetriever</code> implementation." + + " The default value of <code>client_credentials</code> maintains backward compatibility. The built-in grant types are:" + + "<ul>" + + "<li><code>client_credentials</code></li>" + + "<li><code>urn:ietf:params:oauth:grant-type:jwt-bearer</code></li>" + + "</ul>" + + "<p>The OAuth code in Apache Kafka does not limit the values that are used. A user can write a custom <code>JwtRetriever</code> implementation that uses" + + " a completely different grant type, if desired.</p>"; + + public static final String SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope"; + public static final String SASL_OAUTHBEARER_SCOPE_DOC = "<p>This is the level of access a client application is granted to a resource or API which is" + + " included in the token request. If provided, it should match one or more scopes configured in the identity provider.</p>" + + "<p>" + + "The scope was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>scope</code>." + + " For backward compatibility, the <code>scope</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." + + "</p>" + + "<p>Order of precedence:</p>" + + "<ul>" + + "<li><code>sasl.oauthbearer.scope</code> from configuration</li>" + + "<li><code>scope</code> from JAAS</li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID = "sasl.oauthbearer.client.credentials.client.id"; + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID_DOC = "<p>The ID (defined in/by the OAuth identity provider) to identify the client" + + " requesting the token.</p>" + + "<p>" + + "The client ID was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>clientId</code>." + + " For backward compatibility, the <code>clientId</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." + + "</p>" + + "<p>Order of precedence:</p>" + + "<ul>" + + "<li><code>sasl.oauthbearer.client.credentials.client.id</code> from configuration</li>" + + "<li><code>clientId</code> from JAAS</li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET = "sasl.oauthbearer.client.credentials.client.secret"; + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET_DOC = "<p>The secret (defined by either the user or preassigned, depending on the" + + " identity provider) of the client requesting the token.</p>" + + "<p>" + + "The client secret was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>clientSecret</code>." + + " For backward compatibility, the <code>clientSecret</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." + + "</p>" + + "<p>Order of precedence:</p>" + + "<ul>" + + "<li><code>sasl.oauthbearer.client.credentials.client.secret</code> from configuration</li>" + + "<li><code>clientSecret</code> from JAAS</li>" + + "</ul>"; + + private static final String ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE = "<p><em>Note</em>: If a value for <code>sasl.oauthbearer.assertion.file</code> is provided," + + " this configuration will be ignored.</p>"; + + public static final String SASL_OAUTHBEARER_ASSERTION_ALGORITHM = "sasl.oauthbearer.assertion.algorithm"; + public static final String DEFAULT_SASL_OAUTHBEARER_ASSERTION_ALGORITHM = "RS256"; + public static final String SASL_OAUTHBEARER_ASSERTION_ALGORITHM_DOC = "<p>The algorithm the Apache Kafka client should use to sign the assertion sent" + + " to the identity provider. It is also used as the value of the OAuth <code>alg</code> (Algorithm) header in the JWT assertion.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD = "sasl.oauthbearer.assertion.claim.aud"; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD_DOC = "<p>The JWT <code>aud</code> (Audience) claim which will be included in the " + + " client JWT assertion created locally.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_EXP_SECONDS = "sasl.oauthbearer.assertion.claim.exp.seconds"; + public static final int DEFAULT_SASL_OAUTHBEARER_ASSERTION_CLAIM_EXP_SECONDS = 300; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_EXP_SECONDS_DOC = "<p>The number of seconds <em>in the future</em> for which the JWT is valid." + + " The value is used to determine the JWT <code>exp</code> (Expiration) claim based on the current system time when the JWT is created.</p>" + + "<p>The formula to generate the <code>exp</code> claim is very simple:</p>" + + "<pre>" + + "Let:\n\n" + + " x = the current timestamp in seconds, on client\n" + + " y = the value of this configuration\n" + + "\n" + + "Then:\n\n" + + " exp = x + y\n" + + "</pre>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_ISS = "sasl.oauthbearer.assertion.claim.iss"; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_ISS_DOC = "<p>The value to be used as the <code>iss</code> (Issuer) claim which will be included in the" + + " client JWT assertion created locally.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_JTI_INCLUDE = "sasl.oauthbearer.assertion.claim.jti.include"; + public static final boolean DEFAULT_SASL_OAUTHBEARER_ASSERTION_CLAIM_JTI_INCLUDE = false; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_JTI_INCLUDE_DOC = "<p>Flag that determines if the JWT assertion should generate a unique ID for the" + + " JWT and include it in the <code>jti</code> (JWT ID) claim.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_NBF_SECONDS = "sasl.oauthbearer.assertion.claim.nbf.seconds"; + public static final int DEFAULT_SASL_OAUTHBEARER_ASSERTION_CLAIM_NBF_SECONDS = 60; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_NBF_SECONDS_DOC = "<p>The number of seconds <em>in the past</em> from which the JWT is valid." + + " The value is used to determine the JWT <code>nbf</code> (Not Before) claim based on the current system time when the JWT is created.</p>" + + "<p>The formula to generate the <code>nbf</code> claim is very simple:</p>" + + "<pre>" + + "Let:\n\n" + + " x = the current timestamp in seconds, on client\n" + + " y = the value of this configuration\n" + + "\n" + + "Then:\n\n" + + " nbf = x - y\n" + + "</pre>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB = "sasl.oauthbearer.assertion.claim.sub"; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB_DOC = "<p>The value to be used as the <code>sub</code> (Subject) claim which will be included in the" + + " client JWT assertion created locally.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_FILE = "sasl.oauthbearer.assertion.file"; + public static final String SASL_OAUTHBEARER_ASSERTION_FILE_DOC = "<p>File that contains a <em>pre-generated</em> JWT assertion.</p>" + + "<p>The underlying implementation caches the file contents to avoid the performance hit of loading the file on each access. The caching mechanism will detect when" + + "the file changes to allow for the file to be reloaded on modifications. This allows for "live" assertion rotation without restarting the Kafka client.</p>" + + "<p>The file is the assertion in the serialized, three part JWT format:</p>" + + "<ol>" + + "<li>The <em>header</em> section is a base 64-encoded JWT header that contains values like <code>alg</code> (Algorithm)," + + " <code>typ</code> (Type, always the literal value <code>JWT</code>), etc.</li>" + + "<li>The <em>payload</em> section includes the base 64-encoded set of JWT claims, such as <code>aud</code> (Audience), <code>iss</code> (Issuer)," + + " <code>sub</code> (Subject), etc.</li>" + + "<li>The <em>signature</em> section is the concatenated <em>header</em> and <em>payload</em> sections that was signed using a private key</li>" + + "</ol>" + + "<p>See <a href=\"https://datatracker.ietf.org/doc/html/rfc7519\">RFC 7519</a> and <a href=\"https://datatracker.ietf.org/doc/html/rfc7515\">RFC 7515</a>" + + " for more details on the JWT and JWS formats.</p>" + + "<p><em>Note</em>: If a value for <code>sasl.oauthbearer.assertion.file</code> is provided, all other" + + " <code>sasl.oauthbearer.assertion.</code>* configuration are ignored.</p>"; + + public static final String SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE = "sasl.oauthbearer.assertion.private.key.file"; + public static final String SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE_DOC = "<p>File that contains a private key in the standard PEM format which is used to" + + " sign the JWT assertion sent to the identity provider.</p>" + + "<p>The underlying implementation caches the file contents to avoid the performance hit of loading the file on each access. The caching mechanism will detect when" + + "the file changes to allow for the file to be reloaded on modifications. This allows for "live" private key rotation without restarting the Kafka client.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_PASSPHRASE = "sasl.oauthbearer.assertion.private.key.passphrase"; + public static final String SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_PASSPHRASE_DOC = "<p>The optional passphrase to decrypt the private key file specified by" + + " <code>sasl.oauthbearer.assertion.private.key.file</code>.</p>" + + "<p><em>Note</em>: If the file referred to by <code>sasl.oauthbearer.assertion.private.key.file</code> is modified on the file system at runtime and it was" + + "created with a <em>different</em> passphrase than it was previously, the client will not be able to access the private key file because the passphrase is now" Review Comment: ```suggestion + " created with a <em>different</em> passphrase than it was previously, the client will not be able to access the private key file because the passphrase is now" ``` ########## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetriever.java: ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer; + +import org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils; +import org.apache.kafka.common.security.oauthbearer.internals.secured.HttpJwtRetriever; +import org.apache.kafka.common.security.oauthbearer.internals.secured.HttpRequestFormatter; +import org.apache.kafka.common.security.oauthbearer.internals.secured.JaasOptionsUtils; +import org.apache.kafka.common.security.oauthbearer.internals.secured.JwtBearerRequestFormatter; +import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.AssertionCreator; +import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.AssertionJwtTemplate; +import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.DefaultAssertionCreator; +import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.FileAssertionCreator; +import org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.StaticAssertionJwtTemplate; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + +import javax.net.ssl.SSLSocketFactory; +import javax.security.auth.login.AppConfigurationEntry; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_ALGORITHM; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_PASSPHRASE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL; +import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG; +import static org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.AssertionUtils.layeredAssertionJwtTemplate; + +/** + * {@code JwtBearerJwtRetriever} is a {@link JwtRetriever} that performs the steps to request + * a JWT from an OAuth/OIDC identity provider using the <code>urn:ietf:params:oauth:grant-type:jwt-bearer</code> + * grant type. This grant type is used for machine-to-machine "service accounts". + * + * <p/> + * + * This {@code JwtRetriever} is enabled by specifying its class name in the Kafka configuration. + * For client use, specify the class name in the <code>sasl.oauthbearer.jwt.retriever.class</code> + * configuration like so: + * + * <pre> + * sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever + * </pre> + * + * <p/> + * + * If using this {@code JwtRetriever} on the broker side (for inter-broker communication), the configuration + * should be specified with a listener-based property: + * + * <pre> + * listener.name.<listener name>.oauthbearer.sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever Review Comment: uhm having unescaped "\<listener name\>" within the \<pre\> will probably mess it up? (ha ha, I had to escape the tags myself on this comment to show properly) ########## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/CachedFile.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.oauthbearer.internals.secured; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.oauthbearer.JwtValidatorException; +import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerIllegalTokenException; +import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; + +/** + * {@code CachedFile} goes a little beyond the basic file caching mechanism by allowing the file to be "transformed" + * into an in-memory representation of the file contents for easier use by the caller. + * + * @param <T> Type of the "transformed" file contents + */ +public class CachedFile<T> { + + /** + * Function object that provides as arguments the file and its contents and returns the in-memory representation + * of the file contents. + */ + public interface Transformer<T> { + + /** + * Transforms the raw contents into a (possibly) different representation. + * + * @param file File containing the source data + * @param contents Data from file; could be zero length but not {@code null} + */ + T transform(File file, String contents); Review Comment: seems redundant that we need to pass the File and the contents, is it needed? (or could we consider passing the file only, and retrieve the contents from it?) ########## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ########## @@ -129,6 +130,186 @@ public class SaslConfigs { + " authentication provider." + LOGIN_EXPONENTIAL_BACKOFF_NOTE; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "sasl.oauthbearer.jwt.retriever.class"; + public static final String DEFAULT_SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever"; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtRetriever</code> implementation used to" + + " request tokens from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.FileJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "sasl.oauthbearer.jwt.validator.class"; + public static final String DEFAULT_BROKER_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator"; + public static final String DEFAULT_CLIENT_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.ClientJwtValidator"; + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtValidator</code> implementation used to" + + " validate the JWT from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtValidator</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_GRANT_TYPE = "sasl.oauthbearer.grant.type"; + public static final String DEFAULT_SASL_OAUTHBEARER_GRANT_TYPE = "client_credentials"; + public static final String SASL_OAUTHBEARER_GRANT_TYPE_DOC = "The OAuth grant type to use when communicating with the identity provider. On the whole, the" + + " OAuth layer does not rely on this value and expects it to be used and/or verified for correctness by the <code>JwtRetriever</code> implementation." Review Comment: Is it fair to summarize that this is the `OAuth grant type to be used by the JWTRetriever impl when communicating with the identity provider`? ########## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.oauthbearer.internals.secured.ClientCredentialsRequestFormatter; +import org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils; +import org.apache.kafka.common.security.oauthbearer.internals.secured.HttpJwtRetriever; +import org.apache.kafka.common.security.oauthbearer.internals.secured.HttpRequestFormatter; +import org.apache.kafka.common.security.oauthbearer.internals.secured.JaasOptionsUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; + +import javax.net.ssl.SSLSocketFactory; +import javax.security.auth.login.AppConfigurationEntry; + +import static org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL; +import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG; +import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG; +import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG; + +/** + * {@code ClientCredentialsJwtRetriever} is a {@link JwtRetriever} that performs the steps to request + * a JWT from an OAuth/OIDC identity provider using the <code>client_credentials</code> grant type. This + * grant type is commonly used for non-interactive "service accounts" where there is no user available + * to interactively supply credentials. + * + * <p/> + * + * This {@code JwtRetriever} is enabled by specifying its class name in the Kafka configuration. + * For client use, specify the class name in the <code>sasl.oauthbearer.jwt.retriever.class</code> + * configuration like so: + * + * <pre> + * sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever + * </pre> + * + * <p/> + * + * If using this {@code JwtRetriever} on the broker side (for inter-broker communication), the configuration + * should be specified with a listener-based property: + * + * <pre> + * listener.name.<listener name>.oauthbearer.sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever + * </pre> + * + * <p/> + * + * The {@code ClientCredentialsJwtRetriever} also uses the following configuration: + * + * <ul> + * <li><code>sasl.oauthbearer.client.credentials.client.id</code></li> + * <li><code>sasl.oauthbearer.client.credentials.client.secret</code></li> + * <li><code>sasl.oauthbearer.scope</code></li> + * <li><code>sasl.oauthbearer.token.endpoint.url</code></li> + * </ul> + * + * Please refer to the official Apache Kafka documentation for more information on these, and related, configuration. Review Comment: ```suggestion * Please refer to the official Apache Kafka documentation for more information on these, and related configuration. ``` ########## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/BrokerJwtValidator.java: ########## @@ -50,9 +64,11 @@ * Basic structural validation of the <code>b64token</code> value as defined in * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section 2.1</a> * </li> - * <li>Basic conversion of the token into an in-memory data structure</li> * <li> - * Presence of scope, <code>exp</code>, subject, <code>iss</code>, and + * Basic conversion of the token into an in-memory data structure Review Comment: nit: since we're improving here, should we fix the "primary" above, to "performed (primarily by..." ########## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ########## @@ -129,6 +130,186 @@ public class SaslConfigs { + " authentication provider." + LOGIN_EXPONENTIAL_BACKOFF_NOTE; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "sasl.oauthbearer.jwt.retriever.class"; + public static final String DEFAULT_SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever"; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtRetriever</code> implementation used to" + + " request tokens from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.FileJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "sasl.oauthbearer.jwt.validator.class"; + public static final String DEFAULT_BROKER_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator"; + public static final String DEFAULT_CLIENT_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.ClientJwtValidator"; Review Comment: why do we need to expose these 2 as public SaslConfigs? These are used by the `DefaultJwtValidator` implementation internally, so no need for a config, but I may be missing another expected usage? ########## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtRetriever.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.security.oauthbearer.internals.secured.ClientCredentialsRequestFormatter; +import org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils; +import org.apache.kafka.common.security.oauthbearer.internals.secured.JwtBearerRequestFormatter; +import org.apache.kafka.common.utils.Utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import javax.security.auth.login.AppConfigurationEntry; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_GRANT_TYPE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL; + +/** + * {@code DefaultJwtRetriever} instantiates and delegates {@link JwtRetriever} API calls to an embedded implementation + * based on configuration: + * + * <ul> + * <li> + * If the value of <code>sasl.oauthbearer.token.endpoint.url</code> is set to a value that starts with the + * <code>file</code> protocol (e.g. <code>file:/tmp/path/to/a/static-jwt.json</code>), an instance of + * {@link FileJwtRetriever} will be used as the underlying {@link JwtRetriever}. Otherwise, the URL is + * assumed to be an HTTP/HTTPS-based URL, and the value of + * <code>sasl.oauthbearer.grant.type</code> will be used to determine the class to use. + * </li> + * <li> + * If the grant type configuration <code>sasl.oauthbearer.grant.type</code> is not present <em>or</em> it + * is set to <code>client_credentials</code>, an instance of {@link ClientCredentialsRequestFormatter} will + * be created and used. If the grant type configuration value is set to + * <code>urn:ietf:params:oauth:grant-type:jwt-bearer</code>, then an instance of {@link JwtBearerJwtRetriever} + * will be used. + * </li> + * </ul> + * + * The configuration required by the individual {@code JwtRetriever} classes will likely differ. Please refer to the + * official Apache Kafka documentation for more information on these, and related, configuration. Review Comment: ```suggestion * official Apache Kafka documentation for more information on these, and related configuration. ``` ########## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerUtils.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.oauthbearer; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerConfigurable; +import org.apache.kafka.common.utils.Utils; + +import java.util.List; +import java.util.Map; + +import javax.security.auth.login.AppConfigurationEntry; + +public class OAuthBearerUtils { + + public static <T> T getConfiguredInstanceOrDefault(Map<String, ?> configs, + String saslMechanism, + List<AppConfigurationEntry> jaasConfigEntries, + String configName, + Class<T> clazz) { + Object classOrClassName = configs.get(configName); + Object o; + + if (classOrClassName instanceof String) { + try { + o = Utils.newInstance((String) classOrClassName, clazz); + } catch (ClassNotFoundException e) { + throw new KafkaException("Class " + classOrClassName + " cannot be found", e); Review Comment: should we be more explicit on `Class X defined in config Y not found`? ########## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/BrokerJwtValidator.java: ########## @@ -61,63 +77,50 @@ * </li> * </ol> */ - public class BrokerJwtValidator implements JwtValidator { private static final Logger log = LoggerFactory.getLogger(BrokerJwtValidator.class); - private final JwtConsumer jwtConsumer; + private final Optional<CloseableVerificationKeyResolver> verificationKeyResolverOpt; - private final String scopeClaimName; + private JwtConsumer jwtConsumer; - private final String subClaimName; + private String scopeClaimName; + + private String subClaimName; /** - * Creates a new {@code BrokerJwtValidator} that will be used by the broker for more - * thorough validation of the JWT. - * - * @param clockSkew The optional value (in seconds) to allow for differences - * between the time of the OAuth/OIDC identity provider and - * the broker. If <code>null</code> is provided, the broker - * and the OAUth/OIDC identity provider are assumed to have - * very close clock settings. - * @param expectedAudiences The (optional) set the broker will use to verify that - * the JWT was issued for one of the expected audiences. - * The JWT will be inspected for the standard OAuth - * <code>aud</code> claim and if this value is set, the - * broker will match the value from JWT's <code>aud</code> - * claim to see if there is an <b>exact</b> match. If there is no - * match, the broker will reject the JWT and authentication - * will fail. May be <code>null</code> to not perform any - * check to verify the JWT's <code>aud</code> claim matches any - * fixed set of known/expected audiences. - * @param expectedIssuer The (optional) value for the broker to use to verify that - * the JWT was created by the expected issuer. The JWT will - * be inspected for the standard OAuth <code>iss</code> claim - * and if this value is set, the broker will match it - * <b>exactly</b> against what is in the JWT's <code>iss</code> - * claim. If there is no match, the broker will reject the JWT - * and authentication will fail. May be <code>null</code> to not - * perform any check to verify the JWT's <code>iss</code> claim - * matches a specific issuer. - * @param verificationKeyResolver jose4j-based {@link VerificationKeyResolver} that is used - * to validate the signature matches the contents of the header - * and payload - * @param scopeClaimName Name of the scope claim to use; must be non-<code>null</code> - * @param subClaimName Name of the subject claim to use; must be - * non-<code>null</code> - * - * @see JwtConsumerBuilder - * @see JwtConsumer - * @see VerificationKeyResolver + * A public, no-args constructor is necessary for instantiation via configuration. */ + public BrokerJwtValidator() { + this.verificationKeyResolverOpt = Optional.empty(); + } + + /* + * Package-visible for testing. + */ + BrokerJwtValidator(CloseableVerificationKeyResolver verificationKeyResolver) { + this.verificationKeyResolverOpt = Optional.of(verificationKeyResolver); + } + + @Override + public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { + ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism); + List<String> expectedAudiencesList = cu.get(SASL_OAUTHBEARER_EXPECTED_AUDIENCE); + Set<String> expectedAudiences = expectedAudiencesList != null ? Set.copyOf(expectedAudiencesList) : null; + Integer clockSkew = cu.validateInteger(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, false); + String expectedIssuer = cu.validateString(SASL_OAUTHBEARER_EXPECTED_ISSUER, false); + String scopeClaimName = cu.validateString(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME); + String subClaimName = cu.validateString(SASL_OAUTHBEARER_SUB_CLAIM_NAME); + + CloseableVerificationKeyResolver verificationKeyResolver = null; + + if (verificationKeyResolverOpt.isPresent()) { + verificationKeyResolver = verificationKeyResolverOpt.get(); + } else { + verificationKeyResolver = VerificationKeyResolverFactory.get(configs, saslMechanism, jaasConfigEntries); + } Review Comment: maybe simplify to `verificationKeyResolverOpt.orElseGet....` ########## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtRetriever.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.oauthbearer; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.security.oauthbearer.internals.secured.ClientCredentialsRequestFormatter; +import org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils; +import org.apache.kafka.common.security.oauthbearer.internals.secured.JwtBearerRequestFormatter; +import org.apache.kafka.common.utils.Utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import javax.security.auth.login.AppConfigurationEntry; + +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_GRANT_TYPE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL; + +/** + * {@code DefaultJwtRetriever} instantiates and delegates {@link JwtRetriever} API calls to an embedded implementation + * based on configuration: + * + * <ul> + * <li> + * If the value of <code>sasl.oauthbearer.token.endpoint.url</code> is set to a value that starts with the + * <code>file</code> protocol (e.g. <code>file:/tmp/path/to/a/static-jwt.json</code>), an instance of + * {@link FileJwtRetriever} will be used as the underlying {@link JwtRetriever}. Otherwise, the URL is + * assumed to be an HTTP/HTTPS-based URL, and the value of + * <code>sasl.oauthbearer.grant.type</code> will be used to determine the class to use. + * </li> + * <li> + * If the grant type configuration <code>sasl.oauthbearer.grant.type</code> is not present <em>or</em> it + * is set to <code>client_credentials</code>, an instance of {@link ClientCredentialsRequestFormatter} will + * be created and used. If the grant type configuration value is set to + * <code>urn:ietf:params:oauth:grant-type:jwt-bearer</code>, then an instance of {@link JwtBearerJwtRetriever} + * will be used. + * </li> + * </ul> + * + * The configuration required by the individual {@code JwtRetriever} classes will likely differ. Please refer to the + * official Apache Kafka documentation for more information on these, and related, configuration. + */ +public class DefaultJwtRetriever implements JwtRetriever { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultJwtRetriever.class); + + private JwtRetriever delegate; + + @Override + public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { + ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism); + URL tokenEndpointUrl = cu.validateUrl(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL); + + if (tokenEndpointUrl.getProtocol().toLowerCase(Locale.ROOT).equals("file")) { + delegate = new FileJwtRetriever(); + } else { + String grantType = cu.validateString(SASL_OAUTHBEARER_GRANT_TYPE, false); + + if (grantType == null || grantType.equalsIgnoreCase(ClientCredentialsRequestFormatter.GRANT_TYPE)) { + delegate = new ClientCredentialsJwtRetriever(); + } else if (grantType.equalsIgnoreCase(JwtBearerRequestFormatter.GRANT_TYPE)) { + delegate = new JwtBearerJwtRetriever(); + } else { + throw new ConfigException("The grant type \"" + grantType + "\" is not supported by the class " + getClass().getName()); Review Comment: this error is truly about conflicting configs (expected headache), so should we clearly state them both to make troubleshooting easier? ~(Grant type X defined in config SASL_OAUTHBEARER_GRANT_TYPE not supported by retriever defined in config SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS) ########## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ########## @@ -129,6 +130,186 @@ public class SaslConfigs { + " authentication provider." + LOGIN_EXPONENTIAL_BACKOFF_NOTE; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "sasl.oauthbearer.jwt.retriever.class"; + public static final String DEFAULT_SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS = "org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever"; + public static final String SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtRetriever</code> implementation used to" + + " request tokens from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientCredentialsJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.FileJwtRetriever</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "sasl.oauthbearer.jwt.validator.class"; + public static final String DEFAULT_BROKER_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator"; + public static final String DEFAULT_CLIENT_SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS = "org.apache.kafka.common.security.oauthbearer.ClientJwtValidator"; + public static final String SASL_OAUTHBEARER_JWT_VALIDATOR_CLASS_DOC = "<p>The fully-qualified class name of a <code>JwtValidator</code> implementation used to" + + " validate the JWT from the identity provider.</p>" + + "<p>The default configuration value represents a class that maintains backward compatibility with previous versions of" + + " Apache Kafka. The default implementation uses the configuration to determine which concrete implementation to create." + + "<p>Other implementations that are provided include:</p>" + + "<ul>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.ClientJwtValidator</code></li>" + + "<li><code>org.apache.kafka.common.security.oauthbearer.DefaultJwtValidator</code></li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_GRANT_TYPE = "sasl.oauthbearer.grant.type"; + public static final String DEFAULT_SASL_OAUTHBEARER_GRANT_TYPE = "client_credentials"; + public static final String SASL_OAUTHBEARER_GRANT_TYPE_DOC = "The OAuth grant type to use when communicating with the identity provider. On the whole, the" + + " OAuth layer does not rely on this value and expects it to be used and/or verified for correctness by the <code>JwtRetriever</code> implementation." + + " The default value of <code>client_credentials</code> maintains backward compatibility. The built-in grant types are:" + + "<ul>" + + "<li><code>client_credentials</code></li>" + + "<li><code>urn:ietf:params:oauth:grant-type:jwt-bearer</code></li>" + + "</ul>" + + "<p>The OAuth code in Apache Kafka does not limit the values that are used. A user can write a custom <code>JwtRetriever</code> implementation that uses" + + " a completely different grant type, if desired.</p>"; + + public static final String SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope"; + public static final String SASL_OAUTHBEARER_SCOPE_DOC = "<p>This is the level of access a client application is granted to a resource or API which is" + + " included in the token request. If provided, it should match one or more scopes configured in the identity provider.</p>" + + "<p>" + + "The scope was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>scope</code>." + + " For backward compatibility, the <code>scope</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." + + "</p>" + + "<p>Order of precedence:</p>" + + "<ul>" + + "<li><code>sasl.oauthbearer.scope</code> from configuration</li>" + + "<li><code>scope</code> from JAAS</li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID = "sasl.oauthbearer.client.credentials.client.id"; + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID_DOC = "<p>The ID (defined in/by the OAuth identity provider) to identify the client" + + " requesting the token.</p>" + + "<p>" + + "The client ID was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>clientId</code>." + + " For backward compatibility, the <code>clientId</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." + + "</p>" + + "<p>Order of precedence:</p>" + + "<ul>" + + "<li><code>sasl.oauthbearer.client.credentials.client.id</code> from configuration</li>" + + "<li><code>clientId</code> from JAAS</li>" + + "</ul>"; + + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET = "sasl.oauthbearer.client.credentials.client.secret"; + public static final String SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET_DOC = "<p>The secret (defined by either the user or preassigned, depending on the" + + " identity provider) of the client requesting the token.</p>" + + "<p>" + + "The client secret was previously stored as part of the <code>sasl.jaas.config</code> configuration with the key <code>clientSecret</code>." + + " For backward compatibility, the <code>clientSecret</code> JAAS option can still be used, but it is deprecated and will be removed in a future version." + + "</p>" + + "<p>Order of precedence:</p>" + + "<ul>" + + "<li><code>sasl.oauthbearer.client.credentials.client.secret</code> from configuration</li>" + + "<li><code>clientSecret</code> from JAAS</li>" + + "</ul>"; + + private static final String ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE = "<p><em>Note</em>: If a value for <code>sasl.oauthbearer.assertion.file</code> is provided," + + " this configuration will be ignored.</p>"; + + public static final String SASL_OAUTHBEARER_ASSERTION_ALGORITHM = "sasl.oauthbearer.assertion.algorithm"; + public static final String DEFAULT_SASL_OAUTHBEARER_ASSERTION_ALGORITHM = "RS256"; + public static final String SASL_OAUTHBEARER_ASSERTION_ALGORITHM_DOC = "<p>The algorithm the Apache Kafka client should use to sign the assertion sent" + + " to the identity provider. It is also used as the value of the OAuth <code>alg</code> (Algorithm) header in the JWT assertion.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD = "sasl.oauthbearer.assertion.claim.aud"; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD_DOC = "<p>The JWT <code>aud</code> (Audience) claim which will be included in the " + + " client JWT assertion created locally.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_EXP_SECONDS = "sasl.oauthbearer.assertion.claim.exp.seconds"; + public static final int DEFAULT_SASL_OAUTHBEARER_ASSERTION_CLAIM_EXP_SECONDS = 300; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_EXP_SECONDS_DOC = "<p>The number of seconds <em>in the future</em> for which the JWT is valid." + + " The value is used to determine the JWT <code>exp</code> (Expiration) claim based on the current system time when the JWT is created.</p>" + + "<p>The formula to generate the <code>exp</code> claim is very simple:</p>" + + "<pre>" + + "Let:\n\n" + + " x = the current timestamp in seconds, on client\n" + + " y = the value of this configuration\n" + + "\n" + + "Then:\n\n" + + " exp = x + y\n" + + "</pre>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_ISS = "sasl.oauthbearer.assertion.claim.iss"; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_ISS_DOC = "<p>The value to be used as the <code>iss</code> (Issuer) claim which will be included in the" + + " client JWT assertion created locally.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_JTI_INCLUDE = "sasl.oauthbearer.assertion.claim.jti.include"; + public static final boolean DEFAULT_SASL_OAUTHBEARER_ASSERTION_CLAIM_JTI_INCLUDE = false; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_JTI_INCLUDE_DOC = "<p>Flag that determines if the JWT assertion should generate a unique ID for the" + + " JWT and include it in the <code>jti</code> (JWT ID) claim.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_NBF_SECONDS = "sasl.oauthbearer.assertion.claim.nbf.seconds"; + public static final int DEFAULT_SASL_OAUTHBEARER_ASSERTION_CLAIM_NBF_SECONDS = 60; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_NBF_SECONDS_DOC = "<p>The number of seconds <em>in the past</em> from which the JWT is valid." + + " The value is used to determine the JWT <code>nbf</code> (Not Before) claim based on the current system time when the JWT is created.</p>" + + "<p>The formula to generate the <code>nbf</code> claim is very simple:</p>" + + "<pre>" + + "Let:\n\n" + + " x = the current timestamp in seconds, on client\n" + + " y = the value of this configuration\n" + + "\n" + + "Then:\n\n" + + " nbf = x - y\n" + + "</pre>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB = "sasl.oauthbearer.assertion.claim.sub"; + public static final String SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB_DOC = "<p>The value to be used as the <code>sub</code> (Subject) claim which will be included in the" + + " client JWT assertion created locally.</p>" + + ASSERTION_FILE_MUTUAL_EXCLUSION_NOTICE; + + public static final String SASL_OAUTHBEARER_ASSERTION_FILE = "sasl.oauthbearer.assertion.file"; + public static final String SASL_OAUTHBEARER_ASSERTION_FILE_DOC = "<p>File that contains a <em>pre-generated</em> JWT assertion.</p>" + + "<p>The underlying implementation caches the file contents to avoid the performance hit of loading the file on each access. The caching mechanism will detect when" + + "the file changes to allow for the file to be reloaded on modifications. This allows for "live" assertion rotation without restarting the Kafka client.</p>" + + "<p>The file is the assertion in the serialized, three part JWT format:</p>" + + "<ol>" + + "<li>The <em>header</em> section is a base 64-encoded JWT header that contains values like <code>alg</code> (Algorithm)," + + " <code>typ</code> (Type, always the literal value <code>JWT</code>), etc.</li>" + + "<li>The <em>payload</em> section includes the base 64-encoded set of JWT claims, such as <code>aud</code> (Audience), <code>iss</code> (Issuer)," + + " <code>sub</code> (Subject), etc.</li>" + + "<li>The <em>signature</em> section is the concatenated <em>header</em> and <em>payload</em> sections that was signed using a private key</li>" + + "</ol>" + + "<p>See <a href=\"https://datatracker.ietf.org/doc/html/rfc7519\">RFC 7519</a> and <a href=\"https://datatracker.ietf.org/doc/html/rfc7515\">RFC 7515</a>" + + " for more details on the JWT and JWS formats.</p>" + + "<p><em>Note</em>: If a value for <code>sasl.oauthbearer.assertion.file</code> is provided, all other" + + " <code>sasl.oauthbearer.assertion.</code>* configuration are ignored.</p>"; + + public static final String SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE = "sasl.oauthbearer.assertion.private.key.file"; + public static final String SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE_DOC = "<p>File that contains a private key in the standard PEM format which is used to" + + " sign the JWT assertion sent to the identity provider.</p>" + + "<p>The underlying implementation caches the file contents to avoid the performance hit of loading the file on each access. The caching mechanism will detect when" + + "the file changes to allow for the file to be reloaded on modifications. This allows for "live" private key rotation without restarting the Kafka client.</p>" Review Comment: ```suggestion + " the file changes to allow for the file to be reloaded on modifications. This allows for "live" private key rotation without restarting the Kafka client.</p>" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org