kirktrue commented on code in PR #21483:
URL: https://github.com/apache/kafka/pull/21483#discussion_r2825442573


##########
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/AssertionSupplierFactory.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.kafka.common.security.oauthbearer.internals.secured.assertion;
+
+import org.apache.kafka.common.security.oauthbearer.JwtRetrieverException;
+import 
org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Optional;
+
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_ALGORITHM;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_PASSPHRASE;
+import static 
org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.AssertionUtils.layeredAssertionJwtTemplate;
+
+/**
+ * Factory class for creating JWT assertion suppliers used in OAuth 2.0 client 
authentication.
+ *
+ * <p>
+ * This factory supports two methods of obtaining JWT assertions for client 
authentication:
+ * <ul>
+ *   <li><b>File-based assertions:</b> Pre-generated JWT assertions read from 
a file specified by
+ *       {@link 
org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_ASSERTION_FILE}.
+ *       This is useful for testing or when assertions are managed 
externally.</li>
+ *   <li><b>Locally-generated assertions:</b> JWTs dynamically created and 
signed using a private key
+ *       specified by {@link 
org.apache.kafka.common.config.SaslConfigs#SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE}.
+ *       This is the recommended approach for production use.</li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ * The created supplier can be invoked repeatedly to obtain assertions as 
needed (for example, when
+ * refreshing tokens). For file-based assertions, the same assertion is 
returned on each invocation.

Review Comment:
   Will it still return "the same assertion" if the underlying assertion file 
changes on disk between invocations?



##########
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpJwtRetriever.java:
##########
@@ -313,4 +314,12 @@ static String formatErrorMessage(String errorResponseBody) 
{
         }
         return String.format("{%s}", errorResponseBody);
     }
+
+    @Override
+    public void close() throws IOException {
+        // Close the request formatter if it implements Closeable (e.g., 
ClientAssertionRequestFormatter)
+        if (requestFormatter instanceof Closeable) {
+            ((Closeable) requestFormatter).close();
+        }

Review Comment:
   You might use the following idiom for brevity:
   
   ```suggestion
           Utils.maybeCloseQuietly(requestFormatter, "request formatter");
   ```



##########
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/AssertionUtils.java:
##########
@@ -79,15 +80,27 @@ public static PrivateKey privateKey(byte[] 
privateKeyContents,
             keySpec = new PKCS8EncodedKeySpec(pkcs8EncodedBytes);
         }
 
-        KeyFactory keyFactory = KeyFactory.getInstance("RSA");
-        return keyFactory.generatePrivate(keySpec);
+        // Try RSA first, then EC. PKCS#8 encoded keys are algorithm-agnostic, 
so we need to
+        // attempt multiple key factories to determine the correct key type.
+        for (String keyAlgorithm : new String[]{"RSA", "EC"}) {
+            try {
+                KeyFactory keyFactory = KeyFactory.getInstance(keyAlgorithm);
+                return keyFactory.generatePrivate(keySpec);
+            } catch (GeneralSecurityException e) {
+                // Try next algorithm
+            }
+        }
+
+        throw new GeneralSecurityException("Unable to load private key: 
unsupported key type (tried RSA, EC)");
     }
 
     public static Signature getSignature(String algorithm) throws 
GeneralSecurityException {
         if (algorithm.equalsIgnoreCase(TOKEN_SIGNING_ALGORITHM_RS256)) {
             return Signature.getInstance("SHA256withRSA");
         } else if (algorithm.equalsIgnoreCase(TOKEN_SIGNING_ALGORITHM_ES256)) {
-            return Signature.getInstance("SHA256withECDSA");
+            // Use P1363 format which produces raw R||S concatenation as 
required by JWS (RFC 7515).
+            // Java's default SHA256withECDSA uses DER encoding which is not 
compatible with JWT signatures.
+            return Signature.getInstance("SHA256withECDSAinP1363Format");

Review Comment:
   As you can guess, we mostly tested RSA.



##########
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/AssertionUtils.java:
##########
@@ -67,7 +67,8 @@ public static PrivateKey privateKey(byte[] privateKeyContents,
         PKCS8EncodedKeySpec keySpec;
 
         if (passphrase.isPresent()) {
-            EncryptedPrivateKeyInfo keyInfo = new 
EncryptedPrivateKeyInfo(privateKeyContents);
+            byte[] derEncoded = Base64.getDecoder().decode(privateKeyContents);
+            EncryptedPrivateKeyInfo keyInfo = new 
EncryptedPrivateKeyInfo(derEncoded);

Review Comment:
   Is this a bug in the original implementation?



##########
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ClientAssertionRequestFormatter.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internals.secured;
+
+import 
org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.CloseableSupplier;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@code ClientAssertionRequestFormatter} is an {@link HttpRequestFormatter} 
that formats HTTP requests
+ * for OAuth 2.0 token requests using the <code>client_credentials</code> 
grant type with client assertion
+ * authentication (RFC 7521 and RFC 7523).
+ *
+ * <p>
+ * This formatter is used when authenticating to an OAuth provider using a 
client assertion (JWT)
+ * instead of a client secret. The assertion is typically a self-signed JWT 
that proves the client's
+ * identity. This method is more secure than client secrets as the assertion 
is short-lived and the
+ * private key used to sign it never leaves the client.
+ * </p>
+ *
+ * <p>
+ * The formatter creates HTTP requests with:
+ * <ul>
+ *   <li>Content-Type: application/x-www-form-urlencoded</li>
+ *   <li>grant_type=client_credentials</li>
+ *   
<li>client_assertion_type=urn:ietf:params:oauth:client-assertion-type:jwt-bearer</li>
+ *   <li>client_assertion=&lt;JWT assertion&gt;</li>
+ *   <li>Optional client_id parameter</li>
+ *   <li>Optional scope parameter</li>
+ * </ul>
+ * </p>
+ *
+ * @see ClientSecretRequestFormatter
+ * @see <a href="https://datatracker.ietf.org/doc/html/rfc7521";>RFC 7521: 
Assertion Framework for OAuth 2.0</a>
+ * @see <a href="https://datatracker.ietf.org/doc/html/rfc7523";>RFC 7523: JWT 
Profile for OAuth 2.0 Client Authentication</a>
+ */
+public class ClientAssertionRequestFormatter implements HttpRequestFormatter, 
Closeable {
+    public static final String GRANT_TYPE = "client_credentials";
+    private final String clientId;
+    private final String scope;
+    private final CloseableSupplier<String> assertionSupplier;
+
+    /**
+     * Creates a new {@code ClientAssertionRequestFormatter} instance.
+     *
+     * @param clientId          The OAuth client ID; may be {@code null} if 
the assertion contains the client ID
+     * @param scope             The OAuth scope to request; may be {@code 
null} for default scope
+     * @param assertionSupplier A closeable supplier that provides the JWT 
assertion string when invoked;
+     *                          this formatter takes ownership and will close 
it when {@link #close()} is called
+     */
+    public ClientAssertionRequestFormatter(String clientId, String scope, 
CloseableSupplier<String> assertionSupplier) {
+        this.clientId = clientId;
+        this.scope = scope;
+        this.assertionSupplier = assertionSupplier;
+    }
+
+    @Override
+    public Map<String, String> formatHeaders() {
+        Map<String, String> headers = new HashMap<>();
+        headers.put("Accept", "application/json");
+        headers.put("Cache-Control", "no-cache");
+        headers.put("Content-Type", "application/x-www-form-urlencoded");
+        return headers;
+    }
+
+    @Override
+    public String formatBody() {
+        StringBuilder requestParameters = new StringBuilder();
+        
requestParameters.append("client_assertion_type=").append("urn%3Aietf%3Aparams%3Aoauth%3Aclient-assertion-type%3Ajwt-bearer");
+        
requestParameters.append("&client_assertion=").append(assertionSupplier.get());
+        
requestParameters.append("&grant_type=").append(URLEncoder.encode(GRANT_TYPE, 
StandardCharsets.UTF_8));
+        if (clientId != null && !clientId.isEmpty()) {
+            requestParameters.append("&client_id=").append(clientId);
+        }
+        if (scope != null && !scope.trim().isEmpty()) {
+            String encodedScope = URLEncoder.encode(scope.trim(), 
StandardCharsets.UTF_8);
+            requestParameters.append("&scope=").append(encodedScope);
+        }

Review Comment:
   Could we use `Utils.isBlank()` for these checks? Super minor but reads a 
little cleaner.



##########
gradle/dependencies.gradle:
##########
@@ -229,6 +231,8 @@ libs += [
   spotbugs: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
   swaggerAnnotations: "io.swagger.core.v3:swagger-annotations:$swaggerVersion",
   mockOAuth2Server: 
"no.nav.security:mock-oauth2-server:$versions.mockOAuth2Server",
+  testcontainersJunitJupiter: 
"org.testcontainers:junit-jupiter:$versions.testcontainers",
+  testcontainersKeycloak: 
"com.github.dasniko:testcontainers-keycloak:$versions.testcontainersKeycloak",

Review Comment:
   Super nitpicky, but... With a couple of exceptions, it seems like this array 
is _supposed_ to be sorted alphabetically, though I understand wanting to keep 
the OAuth-related libraries together.
   
   @omkreddy—any guidance on this?



##########
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpRequestFormatterFactory.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.oauthbearer.internals.secured;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import 
org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.AssertionSupplierFactory;
+import 
org.apache.kafka.common.security.oauthbearer.internals.secured.assertion.CloseableSupplier;
+import org.apache.kafka.common.utils.Time;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_ISS;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE;
+import static 
org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE;
+
+/**
+ * Factory class for creating {@link HttpRequestFormatter} instances based on 
the OAuth authentication
+ * method configured.
+ *
+ * <p>
+ * This factory implements a three-tier fallback mechanism: file-based 
assertion (first), locally-generated
+ * assertion (second), or client secret (third/fallback). When multiple 
authentication methods are configured,
+ * the first preference takes precedence and other configurations are ignored 
with a WARN log message.
+ * </p>
+ *
+ * <p>
+ * The factory handles reading configuration values from both the main 
configuration and JAAS options
+ * (for backward compatibility) using {@link ConfigOrJaas}.
+ * </p>
+ */
+public class HttpRequestFormatterFactory {

Review Comment:
   I'm wondering if it makes more sense to name this 
`ClientCredentialsRequestFormatter` and make it a super class of 
`ClientAssertionRequestFormatter` and `ClientSecretRequestFormatter`. Unless we 
foresee the method returning other types besides those two.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to