This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push: new 728666f3ad6 KAFKA-15502: Update SslEngineValidator to handle large stores (#14445) 728666f3ad6 is described below commit 728666f3ad61e9bd49719ee7220bb2dd946fcb55 Author: Manikumar Reddy <manikumar.re...@gmail.com> AuthorDate: Mon Sep 25 22:46:30 2023 -0700 KAFKA-15502: Update SslEngineValidator to handle large stores (#14445) We have observed an issue where inter broker SSL listener is not coming up when running with TLSv3/JDK 17 . SSL debug logs shows that TLSv3 post handshake messages >16K are not getting read and causing SslEngineValidator process to stuck while validating the provided trust/key store. - Right now, WRAP returns if there is already data in the buffer. But if we need more data to be wrapped for UNWRAP to succeed, we end up looping forever. To fix this, now we always attempt WRAP and only return early on BUFFER_OVERFLOW. - Update SslEngineValidator to unwrap post-handshake messages from peer when local handshake status is FINISHED. Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> --- .../kafka/common/security/ssl/SslFactory.java | 58 ++++--- .../kafka/common/security/ssl/SslFactoryTest.java | 29 +++- .../java/org/apache/kafka/test/TestSslUtils.java | 193 +++++++++++++++++++++ 3 files changed, 251 insertions(+), 29 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index 65c37aa6b47..a0a7f7239ee 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -419,12 +419,12 @@ public class SslFactory implements Reconfigurable, Closeable { while (true) { switch (handshakeStatus) { case NEED_WRAP: - if (netBuffer.position() != 0) // Wait for peer to consume previously wrapped data - return; handshakeResult = sslEngine.wrap(EMPTY_BUF, netBuffer); switch (handshakeResult.getStatus()) { case OK: break; case BUFFER_OVERFLOW: + if (netBuffer.position() != 0) // Wait for peer to consume previously wrapped data + return; netBuffer.compact(); netBuffer = Utils.ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize()); netBuffer.flip(); @@ -436,24 +436,8 @@ public class SslFactory implements Reconfigurable, Closeable { } return; case NEED_UNWRAP: - if (peerValidator.netBuffer.position() == 0) // no data to unwrap, return to process peer - return; - peerValidator.netBuffer.flip(); // unwrap the data from peer - handshakeResult = sslEngine.unwrap(peerValidator.netBuffer, appBuffer); - peerValidator.netBuffer.compact(); - handshakeStatus = handshakeResult.getHandshakeStatus(); - switch (handshakeResult.getStatus()) { - case OK: break; - case BUFFER_OVERFLOW: - appBuffer = Utils.ensureCapacity(appBuffer, sslEngine.getSession().getApplicationBufferSize()); - break; - case BUFFER_UNDERFLOW: - netBuffer = Utils.ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize()); - break; - case CLOSED: - default: - throw new SSLException("Unexpected handshake status: " + handshakeResult.getStatus()); - } + handshakeStatus = unwrap(peerValidator, true); + if (handshakeStatus == null) return; break; case NEED_TASK: sslEngine.getDelegatedTask().run(); @@ -463,14 +447,44 @@ public class SslFactory implements Reconfigurable, Closeable { return; case NOT_HANDSHAKING: if (handshakeResult.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.FINISHED) - throw new SSLException("Did not finish handshake"); + throw new SSLException("Did not finish handshake, handshake status: " + handshakeResult.getHandshakeStatus()); + else if (peerValidator.netBuffer.position() != 0) { + unwrap(peerValidator, false); + } return; default: - throw new IllegalStateException("Unexpected handshake status " + handshakeStatus); + throw new IllegalStateException("Unexpected handshake status: " + handshakeStatus); } } } + private SSLEngineResult.HandshakeStatus unwrap(SslEngineValidator peerValidator, boolean updateHandshakeResult) throws SSLException { + // Unwrap regardless of whether there is data in the buffer to ensure that + // handshake status is updated if required. + peerValidator.netBuffer.flip(); // unwrap the data from peer + SSLEngineResult sslEngineResult = sslEngine.unwrap(peerValidator.netBuffer, appBuffer); + if (updateHandshakeResult) { + handshakeResult = sslEngineResult; + } + peerValidator.netBuffer.compact(); + SSLEngineResult.HandshakeStatus handshakeStatus = sslEngineResult.getHandshakeStatus(); + switch (sslEngineResult.getStatus()) { + case OK: break; + case BUFFER_OVERFLOW: + appBuffer = Utils.ensureCapacity(appBuffer, sslEngine.getSession().getApplicationBufferSize()); + break; + case BUFFER_UNDERFLOW: + netBuffer = Utils.ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize()); + // BUFFER_UNDERFLOW typically indicates that we need more data from peer, + // so return to process peer. + return null; + case CLOSED: + default: + throw new SSLException("Unexpected handshake status: " + sslEngineResult.getStatus()); + } + return handshakeStatus; + } + boolean complete() { return sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED || sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING; diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java index 7ac707b5de7..922166020ff 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java @@ -74,13 +74,28 @@ public abstract class SslFactoryTest { Map<String, Object> serverSslConfig = sslConfigsBuilder(Mode.SERVER) .createNewTrustStore(trustStoreFile) .build(); - SslFactory sslFactory = new SslFactory(Mode.SERVER); - sslFactory.configure(serverSslConfig); - //host and port are hints - SSLEngine engine = sslFactory.createSslEngine("localhost", 0); - assertNotNull(engine); - assertEquals(Utils.mkSet(tlsProtocol), Utils.mkSet(engine.getEnabledProtocols())); - assertFalse(engine.getUseClientMode()); + try (SslFactory sslFactory = new SslFactory(Mode.SERVER, null, true)) { + sslFactory.configure(serverSslConfig); + //host and port are hints + SSLEngine engine = sslFactory.createSslEngine("localhost", 0); + assertNotNull(engine); + assertEquals(Utils.mkSet(tlsProtocol), Utils.mkSet(engine.getEnabledProtocols())); + assertFalse(engine.getUseClientMode()); + } + } + + @Test + public void testSslFactoryConfigWithManyKeyStoreEntries() throws Exception { + //generate server configs for keystore with multiple certificate chain + Map<String, Object> serverSslConfig = TestSslUtils.generateConfigsWithCertificateChains(tlsProtocol); + + try (SslFactory sslFactory = new SslFactory(Mode.SERVER, null, true)) { + sslFactory.configure(serverSslConfig); + SSLEngine engine = sslFactory.createSslEngine("localhost", 0); + assertNotNull(engine); + assertEquals(Utils.mkSet(tlsProtocol), Utils.mkSet(engine.getEnabledProtocols())); + assertFalse(engine.getUseClientMode()); + } } @Test diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index 675e6563f87..b351a3dd02d 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.security.auth.SslEngineFactory; import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory; +import org.bouncycastle.asn1.ASN1EncodableVector; import org.bouncycastle.asn1.DEROctetString; import org.bouncycastle.asn1.DERSequence; import org.bouncycastle.asn1.DERT61String; @@ -30,9 +31,11 @@ import org.bouncycastle.asn1.x500.RDN; import org.bouncycastle.asn1.x500.X500Name; import org.bouncycastle.asn1.x500.style.BCStyle; import org.bouncycastle.asn1.x509.AlgorithmIdentifier; +import org.bouncycastle.asn1.x509.BasicConstraints; import org.bouncycastle.asn1.x509.Extension; import org.bouncycastle.asn1.x509.GeneralName; import org.bouncycastle.asn1.x509.GeneralNames; +import org.bouncycastle.asn1.x509.KeyPurposeId; import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; import org.bouncycastle.cert.X509CertificateHolder; import org.bouncycastle.cert.X509v3CertificateBuilder; @@ -117,6 +120,29 @@ public class TestSslUtils { return new CertificateBuilder(days, algorithm).generate(dn, pair); } + /** + * Generate a signed certificate. Self-signed, if no issuer and parentKeyPair are supplied + * + * @param dn The distinguished name of this certificate + * @param keyPair A key pair + * @param daysBeforeNow how many days before now the Certificate is valid for + * @param daysAfterNow how many days from now the Certificate is valid for + * @param issuer The issuer who signs the certificate. Leave null if you want to generate a root + * CA. + * @param parentKeyPair The key pair of the issuer. Leave null if you want to generate a root + * CA. + * @param algorithm the signing algorithm, eg "SHA1withRSA" + * @return the signed certificate + * @throws CertificateException + */ + public static X509Certificate generateSignedCertificate(String dn, KeyPair keyPair, + int daysBeforeNow, int daysAfterNow, String issuer, KeyPair parentKeyPair, + String algorithm, boolean isCA, boolean isServerCert, boolean isClientCert, + String[] hostNames) throws CertificateException, IOException { + return new CertificateBuilder(0, algorithm).sanDnsNames(hostNames).generateSignedCertificate(dn, keyPair, + daysBeforeNow, daysAfterNow, issuer, parentKeyPair, isCA, isServerCert, isClientCert); + } + public static KeyPair generateKeyPair(String algorithm) throws NoSuchAlgorithmException { KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm); keyGen.initialize(algorithm.equals("EC") ? 256 : 2048); @@ -430,6 +456,98 @@ public class TestSslUtils { throw new CertificateException(e); } } + + /** + * @param dn The distinguished name to use + * @param keyPair A key pair to use + * @param daysBeforeNow how many days before now the Certificate is valid for + * @param daysAfterNow how many days from now the Certificate is valid for + * @param issuer The issuer name. if null, "dn" is used + * @param parentKeyPair The parent key pair used to sign this certificate. If null, create + * self-signed certificate authority (CA) + * @return A (self-) signed certificate + * @throws CertificateException + */ + public X509Certificate generateSignedCertificate(String dn, KeyPair keyPair, + int daysBeforeNow, int daysAfterNow, String issuer, KeyPair parentKeyPair, boolean isCA, boolean isServerCert, boolean isClientCert) + throws CertificateException { + X500Name issuerOrDn = (issuer != null) ? new X500Name(issuer) : new X500Name(dn); + return generateSignedCertificate(new X500Name(dn), keyPair, daysBeforeNow, daysAfterNow, + issuerOrDn, parentKeyPair, isCA, isServerCert, isClientCert); + } + + /** + * + * @param dn The distinguished name to use + * @param keyPair A key pair to use + * @param daysBeforeNow how many days before now the Certificate is valid for + * @param daysAfterNow how many days from now the Certificate is valid for + * @param issuer The issuer name. if null, "dn" is used + * @param parentKeyPair The parent key pair used to sign this certificate. If null, create + * self-signed certificate authority (CA) + * @return A (self-) signed certificate + * @throws CertificateException + */ + public X509Certificate generateSignedCertificate(X500Name dn, KeyPair keyPair, + int daysBeforeNow, int daysAfterNow, X500Name issuer, KeyPair parentKeyPair, boolean isCA, boolean isServerCert, boolean isClientCert) + throws CertificateException { + try { + Security.addProvider(new BouncyCastleProvider()); + AlgorithmIdentifier sigAlgId = + new DefaultSignatureAlgorithmIdentifierFinder().find(algorithm); + AlgorithmIdentifier digAlgId = + new DefaultDigestAlgorithmIdentifierFinder().find(sigAlgId); + // Create self-signed certificate if no parentKeyPair has been specified, otherwise + // sign with private key of parentKeyPair + KeyPair signingKeyPair = (parentKeyPair != null) ? parentKeyPair : keyPair; + AsymmetricKeyParameter privateKeyAsymKeyParam = + PrivateKeyFactory.createKey(signingKeyPair.getPrivate().getEncoded()); + SubjectPublicKeyInfo subPubKeyInfo = + SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded()); + BcContentSignerBuilder signerBuilder; + String keyAlgorithm = keyPair.getPublic().getAlgorithm(); + if (keyAlgorithm.equals("RSA")) + signerBuilder = new BcRSAContentSignerBuilder(sigAlgId, digAlgId); + else if (keyAlgorithm.equals("DSA")) + signerBuilder = new BcDSAContentSignerBuilder(sigAlgId, digAlgId); + else if (keyAlgorithm.equals("EC")) + signerBuilder = new BcECContentSignerBuilder(sigAlgId, digAlgId); + else + throw new IllegalArgumentException("Unsupported algorithm " + keyAlgorithm); + ContentSigner sigGen = signerBuilder.build(privateKeyAsymKeyParam); + // Negative numbers for "days" can be used to generate expired certificates + Date now = new Date(); + Date from = new Date(now.getTime() - daysBeforeNow * 86400000L); + Date to = new Date(now.getTime() + daysAfterNow * 86400000L); + BigInteger sn = new BigInteger(64, new SecureRandom()); + X500Name issuerOrDn = (issuer != null) ? issuer : dn; + X509v3CertificateBuilder v3CertGen = + new X509v3CertificateBuilder(issuerOrDn, sn, from, to, dn, subPubKeyInfo); + if (isCA) { + v3CertGen.addExtension(Extension.basicConstraints, true, new BasicConstraints(isCA)); + } + if (isServerCert || isClientCert) { + ASN1EncodableVector purposes = new ASN1EncodableVector(); + if (isServerCert) { + purposes.add(KeyPurposeId.id_kp_serverAuth); + } + if (isClientCert) { + purposes.add(KeyPurposeId.id_kp_clientAuth); + } + v3CertGen.addExtension(Extension.extendedKeyUsage, false, new DERSequence(purposes)); + } + if (subjectAltName != null) { + v3CertGen.addExtension(Extension.subjectAlternativeName, false, subjectAltName); + } + X509CertificateHolder certificateHolder = v3CertGen.build(sigGen); + return new JcaX509CertificateConverter().setProvider("BC") + .getCertificate(certificateHolder); + } catch (CertificateException ce) { + throw ce; + } catch (Exception e) { + throw new CertificateException(e); + } + } } public static class SslConfigsBuilder { @@ -635,4 +753,79 @@ public class TestSslUtils { defaultSslEngineFactory.configure(configs); } } + + /** + * method to generate ssl configs for keystore with large number of entries. This is used to verify large key stores and + * post-handshake messages in SslEngineValidator with TLSv3/JDK17+ + * @param tlsProtocol + * @return ssl configs + * @throws Exception + */ + public static Map<String, Object> generateConfigsWithCertificateChains(String tlsProtocol) throws Exception { + int nrOfCerts = 10; + KeyPair[] keyPairs = new KeyPair[nrOfCerts]; + for (int i = 0; i < nrOfCerts; i++) { + keyPairs[i] = TestSslUtils.generateKeyPair("RSA"); + } + + //add a bunch of hostNames to keystore to increase the keystore size + String[] hostNames = new String[150]; + for (int i = 0; i < hostNames.length; i++) { + hostNames[i] = "hostName" + i; + } + + X509Certificate[] certs = new X509Certificate[nrOfCerts]; + // Generate root CA + int caIndex = nrOfCerts - 1; + certs[caIndex] = TestSslUtils.generateSignedCertificate("CN=CA", keyPairs[caIndex], 365, + 365, null, null, "SHA512withRSA", true, false, false, hostNames); + + //Generate Intermediate certificates + for (int intermediateCertIndex = caIndex - 1; intermediateCertIndex > 0; intermediateCertIndex--) { + certs[intermediateCertIndex] = TestSslUtils.generateSignedCertificate("CN=Intermediate CA" + intermediateCertIndex, + keyPairs[intermediateCertIndex], 365, 365, certs[intermediateCertIndex + 1].getSubjectX500Principal().getName(), + keyPairs[intermediateCertIndex + 1], "SHA512withRSA", true, false, false, hostNames); + } + + // Generate a valid end certificate + certs[0] = TestSslUtils.generateSignedCertificate("CN=kafka", keyPairs[0], 1, 1, + certs[1].getSubjectX500Principal().getName(), keyPairs[1], "SHA512withRSA", false, true, true, hostNames); + + File keystoreStoreFile = TestUtils.tempFile("keystore", ".jks"); + Password keyStorePassword = new Password("password"); + KeyStore keyStore = KeyStore.getInstance("PKCS12"); + keyStore.load(null, null); + keyStore.setKeyEntry("issued-cert", keyPairs[0].getPrivate(), keyStorePassword.value().toCharArray(), certs); + saveKeyStore(keyStore, keystoreStoreFile.getPath(), keyStorePassword); + + File trustStoreFile = TestUtils.tempFile("truststore", ".jks"); + Password trustStorePassword = new Password("password"); + KeyStore trustStore = KeyStore.getInstance("PKCS12"); + trustStore.load(null, null); + for (X509Certificate cert : certs) { + trustStore.setCertificateEntry(cert.getSubjectX500Principal().getName(), cert); + } + saveKeyStore(trustStore, trustStoreFile.getPath(), trustStorePassword); + + Map<String, Object> sslConfigs = new HashMap<>(); + + sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocol); // protocol to create SSLContext + sslConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keystoreStoreFile.getPath()); + sslConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS"); + sslConfigs.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm()); + sslConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassword); + sslConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyStorePassword); + + sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath()); + sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword); + sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS"); + sslConfigs.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm()); + + List<String> enabledProtocols = new ArrayList<>(); + enabledProtocols.add(tlsProtocol); + sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols); + + return sslConfigs; + } + }