This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 728666f3ad6 KAFKA-15502: Update SslEngineValidator to handle large 
stores (#14445)
728666f3ad6 is described below

commit 728666f3ad61e9bd49719ee7220bb2dd946fcb55
Author: Manikumar Reddy <manikumar.re...@gmail.com>
AuthorDate: Mon Sep 25 22:46:30 2023 -0700

    KAFKA-15502: Update SslEngineValidator to handle large stores (#14445)
    
    We have observed an issue where inter broker SSL listener is not coming up 
when running with TLSv3/JDK 17 .
    SSL debug logs shows that TLSv3 post handshake messages >16K are not 
getting read and causing SslEngineValidator process to stuck while validating 
the provided trust/key store.
    
    - Right now, WRAP returns if there is already data in the buffer. But if we 
need more data to be wrapped for UNWRAP to succeed, we end up looping forever. 
To fix this, now we always attempt WRAP and only return early on 
BUFFER_OVERFLOW.
    - Update SslEngineValidator to unwrap post-handshake messages from peer 
when local handshake status is FINISHED.
    
    Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>
---
 .../kafka/common/security/ssl/SslFactory.java      |  58 ++++---
 .../kafka/common/security/ssl/SslFactoryTest.java  |  29 +++-
 .../java/org/apache/kafka/test/TestSslUtils.java   | 193 +++++++++++++++++++++
 3 files changed, 251 insertions(+), 29 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 65c37aa6b47..a0a7f7239ee 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -419,12 +419,12 @@ public class SslFactory implements Reconfigurable, 
Closeable {
             while (true) {
                 switch (handshakeStatus) {
                     case NEED_WRAP:
-                        if (netBuffer.position() != 0) // Wait for peer to 
consume previously wrapped data
-                            return;
                         handshakeResult = sslEngine.wrap(EMPTY_BUF, netBuffer);
                         switch (handshakeResult.getStatus()) {
                             case OK: break;
                             case BUFFER_OVERFLOW:
+                                if (netBuffer.position() != 0) // Wait for 
peer to consume previously wrapped data
+                                    return;
                                 netBuffer.compact();
                                 netBuffer = Utils.ensureCapacity(netBuffer, 
sslEngine.getSession().getPacketBufferSize());
                                 netBuffer.flip();
@@ -436,24 +436,8 @@ public class SslFactory implements Reconfigurable, 
Closeable {
                         }
                         return;
                     case NEED_UNWRAP:
-                        if (peerValidator.netBuffer.position() == 0) // no 
data to unwrap, return to process peer
-                            return;
-                        peerValidator.netBuffer.flip(); // unwrap the data 
from peer
-                        handshakeResult = 
sslEngine.unwrap(peerValidator.netBuffer, appBuffer);
-                        peerValidator.netBuffer.compact();
-                        handshakeStatus = handshakeResult.getHandshakeStatus();
-                        switch (handshakeResult.getStatus()) {
-                            case OK: break;
-                            case BUFFER_OVERFLOW:
-                                appBuffer = Utils.ensureCapacity(appBuffer, 
sslEngine.getSession().getApplicationBufferSize());
-                                break;
-                            case BUFFER_UNDERFLOW:
-                                netBuffer = Utils.ensureCapacity(netBuffer, 
sslEngine.getSession().getPacketBufferSize());
-                                break;
-                            case CLOSED:
-                            default:
-                                throw new SSLException("Unexpected handshake 
status: " + handshakeResult.getStatus());
-                        }
+                        handshakeStatus = unwrap(peerValidator, true);
+                        if (handshakeStatus == null) return;
                         break;
                     case NEED_TASK:
                         sslEngine.getDelegatedTask().run();
@@ -463,14 +447,44 @@ public class SslFactory implements Reconfigurable, 
Closeable {
                         return;
                     case NOT_HANDSHAKING:
                         if (handshakeResult.getHandshakeStatus() != 
SSLEngineResult.HandshakeStatus.FINISHED)
-                            throw new SSLException("Did not finish handshake");
+                            throw new SSLException("Did not finish handshake, 
handshake status: " + handshakeResult.getHandshakeStatus());
+                        else if (peerValidator.netBuffer.position() != 0) {
+                            unwrap(peerValidator, false);
+                        }
                         return;
                     default:
-                        throw new IllegalStateException("Unexpected handshake 
status " + handshakeStatus);
+                        throw new IllegalStateException("Unexpected handshake 
status: " + handshakeStatus);
                 }
             }
         }
 
+        private SSLEngineResult.HandshakeStatus unwrap(SslEngineValidator 
peerValidator, boolean updateHandshakeResult) throws SSLException {
+            // Unwrap regardless of whether there is data in the buffer to 
ensure that
+            // handshake status is updated if required.
+            peerValidator.netBuffer.flip(); // unwrap the data from peer
+            SSLEngineResult sslEngineResult = 
sslEngine.unwrap(peerValidator.netBuffer, appBuffer);
+            if (updateHandshakeResult) {
+                handshakeResult = sslEngineResult;
+            }
+            peerValidator.netBuffer.compact();
+            SSLEngineResult.HandshakeStatus handshakeStatus = 
sslEngineResult.getHandshakeStatus();
+            switch (sslEngineResult.getStatus()) {
+                case OK: break;
+                case BUFFER_OVERFLOW:
+                    appBuffer = Utils.ensureCapacity(appBuffer, 
sslEngine.getSession().getApplicationBufferSize());
+                    break;
+                case BUFFER_UNDERFLOW:
+                    netBuffer = Utils.ensureCapacity(netBuffer, 
sslEngine.getSession().getPacketBufferSize());
+                    // BUFFER_UNDERFLOW typically indicates that we need more 
data from peer,
+                    // so return to process peer.
+                    return null;
+                case CLOSED:
+                default:
+                    throw new SSLException("Unexpected handshake status: " + 
sslEngineResult.getStatus());
+            }
+            return handshakeStatus;
+        }
+
         boolean complete() {
             return sslEngine.getHandshakeStatus() == 
SSLEngineResult.HandshakeStatus.FINISHED ||
                     sslEngine.getHandshakeStatus() == 
SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index 7ac707b5de7..922166020ff 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -74,13 +74,28 @@ public abstract class SslFactoryTest {
         Map<String, Object> serverSslConfig = sslConfigsBuilder(Mode.SERVER)
                 .createNewTrustStore(trustStoreFile)
                 .build();
-        SslFactory sslFactory = new SslFactory(Mode.SERVER);
-        sslFactory.configure(serverSslConfig);
-        //host and port are hints
-        SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
-        assertNotNull(engine);
-        assertEquals(Utils.mkSet(tlsProtocol), 
Utils.mkSet(engine.getEnabledProtocols()));
-        assertFalse(engine.getUseClientMode());
+        try (SslFactory sslFactory = new SslFactory(Mode.SERVER, null, true)) {
+            sslFactory.configure(serverSslConfig);
+            //host and port are hints
+            SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
+            assertNotNull(engine);
+            assertEquals(Utils.mkSet(tlsProtocol), 
Utils.mkSet(engine.getEnabledProtocols()));
+            assertFalse(engine.getUseClientMode());
+        }
+    }
+
+    @Test
+    public void testSslFactoryConfigWithManyKeyStoreEntries() throws Exception 
{
+        //generate server configs for keystore with multiple certificate chain
+        Map<String, Object> serverSslConfig = 
TestSslUtils.generateConfigsWithCertificateChains(tlsProtocol);
+
+        try (SslFactory sslFactory = new SslFactory(Mode.SERVER, null, true)) {
+            sslFactory.configure(serverSslConfig);
+            SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
+            assertNotNull(engine);
+            assertEquals(Utils.mkSet(tlsProtocol), 
Utils.mkSet(engine.getEnabledProtocols()));
+            assertFalse(engine.getUseClientMode());
+        }
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index 675e6563f87..b351a3dd02d 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.security.auth.SslEngineFactory;
 import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
+import org.bouncycastle.asn1.ASN1EncodableVector;
 import org.bouncycastle.asn1.DEROctetString;
 import org.bouncycastle.asn1.DERSequence;
 import org.bouncycastle.asn1.DERT61String;
@@ -30,9 +31,11 @@ import org.bouncycastle.asn1.x500.RDN;
 import org.bouncycastle.asn1.x500.X500Name;
 import org.bouncycastle.asn1.x500.style.BCStyle;
 import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
+import org.bouncycastle.asn1.x509.BasicConstraints;
 import org.bouncycastle.asn1.x509.Extension;
 import org.bouncycastle.asn1.x509.GeneralName;
 import org.bouncycastle.asn1.x509.GeneralNames;
+import org.bouncycastle.asn1.x509.KeyPurposeId;
 import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.bouncycastle.cert.X509v3CertificateBuilder;
@@ -117,6 +120,29 @@ public class TestSslUtils {
         return new CertificateBuilder(days, algorithm).generate(dn, pair);
     }
 
+    /**
+     * Generate a signed certificate. Self-signed, if no issuer and 
parentKeyPair are supplied
+     * 
+     * @param dn The distinguished name of this certificate
+     * @param keyPair A key pair
+     * @param daysBeforeNow how many days before now the Certificate is valid 
for
+     * @param daysAfterNow how many days from now the Certificate is valid for
+     * @param issuer The issuer who signs the certificate. Leave null if you 
want to generate a root
+     *        CA.
+     * @param parentKeyPair The key pair of the issuer. Leave null if you want 
to generate a root
+     *        CA.
+     * @param algorithm the signing algorithm, eg "SHA1withRSA"
+     * @return the signed certificate
+     * @throws CertificateException
+     */
+    public static X509Certificate generateSignedCertificate(String dn, KeyPair 
keyPair,
+                                                            int daysBeforeNow, 
int daysAfterNow, String issuer, KeyPair parentKeyPair,
+                                                            String algorithm, 
boolean isCA, boolean isServerCert, boolean isClientCert,
+                                                            String[] 
hostNames) throws CertificateException, IOException {
+        return new CertificateBuilder(0, 
algorithm).sanDnsNames(hostNames).generateSignedCertificate(dn, keyPair,
+                daysBeforeNow, daysAfterNow, issuer, parentKeyPair, isCA, 
isServerCert, isClientCert);
+    }
+
     public static KeyPair generateKeyPair(String algorithm) throws 
NoSuchAlgorithmException {
         KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm);
         keyGen.initialize(algorithm.equals("EC") ? 256 : 2048);
@@ -430,6 +456,98 @@ public class TestSslUtils {
                 throw new CertificateException(e);
             }
         }
+
+        /**
+         * @param dn The distinguished name to use
+         * @param keyPair A key pair to use
+         * @param daysBeforeNow how many days before now the Certificate is 
valid for
+         * @param daysAfterNow how many days from now the Certificate is valid 
for
+         * @param issuer The issuer name. if null, "dn" is used
+         * @param parentKeyPair The parent key pair used to sign this 
certificate. If null, create
+         *        self-signed certificate authority (CA)
+         * @return A (self-) signed certificate
+         * @throws CertificateException
+         */
+        public X509Certificate generateSignedCertificate(String dn, KeyPair 
keyPair,
+                int daysBeforeNow, int daysAfterNow, String issuer, KeyPair 
parentKeyPair, boolean isCA, boolean isServerCert, boolean isClientCert)
+                throws CertificateException {
+            X500Name issuerOrDn = (issuer != null) ? new X500Name(issuer) : 
new X500Name(dn);
+            return generateSignedCertificate(new X500Name(dn), keyPair, 
daysBeforeNow, daysAfterNow,
+                    issuerOrDn, parentKeyPair, isCA, isServerCert, 
isClientCert);
+        }
+
+        /**
+         *
+         * @param dn The distinguished name to use
+         * @param keyPair A key pair to use
+         * @param daysBeforeNow how many days before now the Certificate is 
valid for
+         * @param daysAfterNow how many days from now the Certificate is valid 
for
+         * @param issuer The issuer name. if null, "dn" is used
+         * @param parentKeyPair The parent key pair used to sign this 
certificate. If null, create
+         *        self-signed certificate authority (CA)
+         * @return A (self-) signed certificate
+         * @throws CertificateException
+         */
+        public X509Certificate generateSignedCertificate(X500Name dn, KeyPair 
keyPair,
+                int daysBeforeNow, int daysAfterNow, X500Name issuer, KeyPair 
parentKeyPair, boolean isCA, boolean isServerCert, boolean isClientCert)
+                throws CertificateException {
+            try {
+                Security.addProvider(new BouncyCastleProvider());
+                AlgorithmIdentifier sigAlgId =
+                        new 
DefaultSignatureAlgorithmIdentifierFinder().find(algorithm);
+                AlgorithmIdentifier digAlgId =
+                        new 
DefaultDigestAlgorithmIdentifierFinder().find(sigAlgId);
+                // Create self-signed certificate if no parentKeyPair has been 
specified, otherwise
+                // sign with private key of parentKeyPair
+                KeyPair signingKeyPair = (parentKeyPair != null) ? 
parentKeyPair : keyPair;
+                AsymmetricKeyParameter privateKeyAsymKeyParam =
+                        
PrivateKeyFactory.createKey(signingKeyPair.getPrivate().getEncoded());
+                SubjectPublicKeyInfo subPubKeyInfo =
+                        
SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded());
+                BcContentSignerBuilder signerBuilder;
+                String keyAlgorithm = keyPair.getPublic().getAlgorithm();
+                if (keyAlgorithm.equals("RSA"))
+                    signerBuilder = new BcRSAContentSignerBuilder(sigAlgId, 
digAlgId);
+                else if (keyAlgorithm.equals("DSA"))
+                    signerBuilder = new BcDSAContentSignerBuilder(sigAlgId, 
digAlgId);
+                else if (keyAlgorithm.equals("EC"))
+                    signerBuilder = new BcECContentSignerBuilder(sigAlgId, 
digAlgId);
+                else
+                    throw new IllegalArgumentException("Unsupported algorithm 
" + keyAlgorithm);
+                ContentSigner sigGen = 
signerBuilder.build(privateKeyAsymKeyParam);
+                // Negative numbers for "days" can be used to generate expired 
certificates
+                Date now = new Date();
+                Date from = new Date(now.getTime() - daysBeforeNow * 
86400000L);
+                Date to = new Date(now.getTime() + daysAfterNow * 86400000L);
+                BigInteger sn = new BigInteger(64, new SecureRandom());
+                X500Name issuerOrDn = (issuer != null) ? issuer : dn;
+                X509v3CertificateBuilder v3CertGen =
+                        new X509v3CertificateBuilder(issuerOrDn, sn, from, to, 
dn, subPubKeyInfo);
+                if (isCA) {
+                    v3CertGen.addExtension(Extension.basicConstraints, true, 
new BasicConstraints(isCA));
+                }
+                if (isServerCert || isClientCert) {
+                    ASN1EncodableVector purposes = new ASN1EncodableVector();
+                    if (isServerCert) {
+                        purposes.add(KeyPurposeId.id_kp_serverAuth);
+                    }
+                    if (isClientCert) {
+                        purposes.add(KeyPurposeId.id_kp_clientAuth);
+                    }
+                    v3CertGen.addExtension(Extension.extendedKeyUsage, false, 
new DERSequence(purposes));
+                }
+                if (subjectAltName != null) {
+                    v3CertGen.addExtension(Extension.subjectAlternativeName, 
false, subjectAltName);
+                }
+                X509CertificateHolder certificateHolder = 
v3CertGen.build(sigGen);
+                return new JcaX509CertificateConverter().setProvider("BC")
+                        .getCertificate(certificateHolder);
+            } catch (CertificateException ce) {
+                throw ce;
+            } catch (Exception e) {
+                throw new CertificateException(e);
+            }
+        }
     }
 
     public static class SslConfigsBuilder {
@@ -635,4 +753,79 @@ public class TestSslUtils {
             defaultSslEngineFactory.configure(configs);
         }
     }
+
+    /**
+     * method to generate ssl configs for keystore with large number of 
entries. This is used to verify large key stores and
+     * post-handshake messages in SslEngineValidator with TLSv3/JDK17+
+     * @param tlsProtocol
+     * @return ssl configs
+     * @throws Exception
+     */
+    public static Map<String, Object> 
generateConfigsWithCertificateChains(String tlsProtocol) throws Exception {
+        int nrOfCerts = 10;
+        KeyPair[] keyPairs = new KeyPair[nrOfCerts];
+        for (int i = 0; i < nrOfCerts; i++) {
+            keyPairs[i] = TestSslUtils.generateKeyPair("RSA");
+        }
+
+        //add a bunch of hostNames to keystore to increase the keystore size
+        String[] hostNames = new String[150];
+        for (int i = 0; i < hostNames.length; i++) {
+            hostNames[i] = "hostName" + i;
+        }
+
+        X509Certificate[] certs = new X509Certificate[nrOfCerts];
+        // Generate root CA
+        int caIndex = nrOfCerts - 1;
+        certs[caIndex] = TestSslUtils.generateSignedCertificate("CN=CA", 
keyPairs[caIndex], 365,
+                365, null, null, "SHA512withRSA", true, false, false, 
hostNames);
+
+        //Generate Intermediate certificates
+        for (int intermediateCertIndex = caIndex - 1; intermediateCertIndex > 
0; intermediateCertIndex--) {
+            certs[intermediateCertIndex] = 
TestSslUtils.generateSignedCertificate("CN=Intermediate CA" +  
intermediateCertIndex,
+                    keyPairs[intermediateCertIndex], 365, 365, 
certs[intermediateCertIndex + 1].getSubjectX500Principal().getName(),
+                    keyPairs[intermediateCertIndex + 1], "SHA512withRSA", 
true, false, false, hostNames);
+        }
+
+        // Generate a valid end certificate
+        certs[0] = TestSslUtils.generateSignedCertificate("CN=kafka", 
keyPairs[0], 1, 1,
+                certs[1].getSubjectX500Principal().getName(), keyPairs[1], 
"SHA512withRSA", false, true, true, hostNames);
+
+        File keystoreStoreFile = TestUtils.tempFile("keystore", ".jks");
+        Password keyStorePassword =  new Password("password");
+        KeyStore keyStore = KeyStore.getInstance("PKCS12");
+        keyStore.load(null, null);
+        keyStore.setKeyEntry("issued-cert", keyPairs[0].getPrivate(), 
keyStorePassword.value().toCharArray(), certs);
+        saveKeyStore(keyStore, keystoreStoreFile.getPath(), keyStorePassword);
+
+        File trustStoreFile = TestUtils.tempFile("truststore", ".jks");
+        Password trustStorePassword =  new Password("password");
+        KeyStore trustStore = KeyStore.getInstance("PKCS12");
+        trustStore.load(null, null);
+        for (X509Certificate cert : certs) {
+            
trustStore.setCertificateEntry(cert.getSubjectX500Principal().getName(), cert);
+        }
+        saveKeyStore(trustStore, trustStoreFile.getPath(), trustStorePassword);
+
+        Map<String, Object> sslConfigs = new HashMap<>();
+
+        sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocol); // 
protocol to create SSLContext
+        sslConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 
keystoreStoreFile.getPath());
+        sslConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
+        sslConfigs.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, 
TrustManagerFactory.getDefaultAlgorithm());
+        sslConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
keyStorePassword);
+        sslConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyStorePassword);
+
+        sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
trustStoreFile.getPath());
+        sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
trustStorePassword);
+        sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
+        sslConfigs.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, 
TrustManagerFactory.getDefaultAlgorithm());
+
+        List<String> enabledProtocols  = new ArrayList<>();
+        enabledProtocols.add(tlsProtocol);
+        sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
enabledProtocols);
+
+        return sslConfigs;
+    }
+
 }

Reply via email to