jonmeredith commented on code in PR #2372:
URL: https://github.com/apache/cassandra/pull/2372#discussion_r1228468711


##########
src/java/org/apache/cassandra/auth/MutualTlsAuthenticator.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.net.InetAddress;
+import java.security.cert.Certificate;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/*
+ * Performs mTLS authentication for client connections by extracting 
identities from client certificate
+ * and verifying them against the authorized identities in IdentityCache. 
IdentityCache is a loading cache that
+ * refreshes values on timely basis.
+ *
+ * During a client connection, after SSL handshake, identity of certificate is 
extracted using the certificate validator
+ * and is verified whether the value exists in the cache or not. If it exists 
access is granted, otherwise, the connection
+ * is rejected.
+ *
+ * Authenticator & Certificate validator can be configured using 
cassandra.yaml, one can write their own mTLS certificate
+ * validator and configure it in cassandra.yaml.Below is an example on how to 
configure validator.
+ * note that this example uses SPIFFE based validator, It could be any other 
validator with any defined identifier format.
+ *
+ * Example:
+ * authenticator:
+ *   class_name : org.apache.cassandra.auth.MutualTlsAuthenticator
+ *   parameters :
+ *     validator_class_name: 
org.apache.cassandra.auth.SpiffeCertificateValidator
+ */
+public class MutualTlsAuthenticator implements IAuthenticator
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(MutualTlsAuthenticator.class);
+    private static final NoSpamLogger nospamLogger = 
NoSpamLogger.getLogger(logger, 1L, TimeUnit.MINUTES);
+    private static final String VALIDATOR_CLASS_NAME = "validator_class_name";
+    private static final String CACHE_NAME = "IdentitiesCache";
+    private final IdentityCache identityCache = new IdentityCache();
+    private final MutualTlsCertificateValidator certificateValidator;
+
+    public MutualTlsAuthenticator(Map<String, String> parameters)
+    {
+        final String certificateValidatorClassName = 
parameters.get(VALIDATOR_CLASS_NAME);
+        if (StringUtils.isEmpty(certificateValidatorClassName))
+        {
+            String message ="authenticator.parameters.validator_class_name is 
not set";
+            logger.error(message);
+            throw new ConfigurationException(message);
+        }
+        certificateValidator = ParameterizedClass.newInstance(new 
ParameterizedClass(certificateValidatorClassName),
+                                                              
Arrays.asList("", AuthConfig.class.getPackage().getName()));
+        checkMtlsConfigurationIsValid(DatabaseDescriptor.getRawConfig());
+        AuthCacheService.instance.register(identityCache);
+    }
+
+    @Override
+    public boolean requireAuthentication()
+    {
+        return true;
+    }
+
+    @Override
+    public Set<? extends IResource> protectedResources()
+    {
+        return 
ImmutableSet.of(DataResource.table(SchemaConstants.AUTH_KEYSPACE_NAME, 
AuthKeyspace.ROLES));
+    }
+
+    @Override
+    public void validateConfiguration() throws ConfigurationException
+    {
+
+    }
+
+    @Override
+    public void setup()
+    {
+        identityCache.warm();
+    }
+
+    @Override
+    public SaslNegotiator newSaslNegotiator(InetAddress clientAddress)
+    {
+        return null;
+    }
+
+    @Override
+    public SaslNegotiator newSaslNegotiator(InetAddress clientAddress, 
Certificate[] certificates)
+    {
+        return new CertificateNegotiator(certificates);
+    }
+
+    @Override
+    public AuthenticatedUser legacyAuthenticate(Map<String, String> 
credentials) throws AuthenticationException
+    {
+        return null;
+    }
+
+    @VisibleForTesting
+    class CertificateNegotiator implements SaslNegotiator
+    {
+        private final Certificate[] clientCertificateChain;
+
+        private CertificateNegotiator(final Certificate[] 
clientCertificateChain)
+        {
+            this.clientCertificateChain = clientCertificateChain;
+        }
+
+        @Override
+        public byte[] evaluateResponse(byte[] clientResponse) throws 
AuthenticationException
+        {
+            return null;
+        }
+
+        @Override
+        public boolean isComplete()
+        {
+            return true;
+        }
+
+        @Override
+        public AuthenticatedUser getAuthenticatedUser() throws 
AuthenticationException
+        {
+            if 
(!certificateValidator.isValidCertificate(clientCertificateChain))
+            {
+                String message = "Invalid or not supported certificate";
+                nospamLogger.error(message);
+                throw new AuthenticationException(message);
+            }
+
+            final String identity = 
certificateValidator.identity(clientCertificateChain);
+            if (StringUtils.isEmpty(identity))
+            {
+                String msg = "Unable to extract client identity from 
certificate for authentication";
+                nospamLogger.error(msg);
+                throw new AuthenticationException(msg);
+            }
+            String role = identityCache.get(identity);
+            if (role == null)
+            {
+                String msg = "None of the users " + identity + " are 
authorized";
+                nospamLogger.error(msg);
+                throw new AuthenticationException(msg);

Review Comment:
   nit: this is a possibility for log injection - the identity provided in the 
client certificate could include text to confuse a log scanner,  e.g. an 
identity of "%property{my_sensitive_property}" could cause it to be logged and 
be exploited if any more log4shell type vulnerabilities are discovered.
   
   You could use `org.slf4j.helpers.MessageFormatter` to create the `msg` 
instead, that would also reduce the size of the map nospamLogger might create 
for previous timestamps.
   ```suggestion
                   String msg = "None of the users {} are authorized";
                   nospamLogger.error(msg, identity);
                   throw new 
AuthenticationException(MessageFormatter.format(msg, identity).getMessage());
   ```



##########
src/java/org/apache/cassandra/auth/SpiffeCertificateValidator.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.exceptions.AuthenticationException;
+
+/**
+ * This class assumes that the identity of a certificate is SPIFFE which is a 
URI that is present as part of the SAN
+ * of the client certificate. It has logic to extract identity (Spiffe) out of 
a certificate & knows how to validate
+ * the client certificates.
+ * <p>
+ *
+ * <p>
+ * Example:
+ * internode_authenticator:
+ * class_name : org.apache.cassandra.auth.MutualTlsAuthenticator
+ * parameters :
+ * validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
+ * authenticator:
+ * class_name : org.apache.cassandra.auth.MutualTlsInternodeAuthenticator
+ * parameters :
+ * validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
+ */
+public class SpiffeCertificateValidator implements 
MutualTlsCertificateValidator
+{
+    @Override
+    public boolean isValidCertificate(Certificate[] clientCertificateChain)
+    {
+        return true;
+    }
+
+    @Override
+    public String identity(Certificate[] clientCertificateChain) throws 
AuthenticationException
+    {
+        // returns spiffe
+        try
+        {
+            return getSANSpiffe(clientCertificateChain);
+        }
+        catch (CertificateException e)
+        {
+            throw new AuthenticationException(e.getMessage(), e);
+        }
+    }
+
+    private static String getSANSpiffe(final Certificate[] clientCertificates) 
throws CertificateException
+    {
+        int URI_TYPE = 6;
+        X509Certificate[] castedCerts = castCertsToX509(clientCertificates);
+        Collection<List<?>> subjectAltNames = 
castedCerts[0].getSubjectAlternativeNames();
+
+        if (subjectAltNames != null)
+        {
+            for (List<?> item : subjectAltNames)
+            {
+                Integer type = (Integer) item.get(0);
+                String spiffe = (String) item.get(1);
+                if (type == URI_TYPE && spiffe.contains("spiffe"))

Review Comment:
   could we strengthen this to `spiffe.startsWith("spiffe://")` or at least 
include the `://` if you're concerned about certificates with whitespace before 
the spiffe.  I would go for the first option given this is security related.



##########
test/unit/org/apache/cassandra/auth/MutualTlsInternodeAuthenticatorTest.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.auth;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.auth.AuthTestUtils.loadCertificateChain;
+import static 
org.apache.cassandra.auth.IInternodeAuthenticator.InternodeConnectionDirection.INBOUND;
+import static 
org.apache.cassandra.auth.IInternodeAuthenticator.InternodeConnectionDirection.OUTBOUND;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_CONFIG;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class MutualTlsInternodeAuthenticatorTest
+{
+    private static final String VALIDATOR_CLASS_NAME = "validator_class_name";
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    @Parameterized.Parameter(0)
+    public String certificatePath;
+    @Parameterized.Parameter(1)
+    public String identity;
+
+    @Parameterized.Parameters()
+    public static Collection<Object[]> versions()
+    {
+        return Collections.singletonList(new Object[]{ 
"auth/SampleMtlsClientCertificate.pem", 
"spiffe://testdomain.com/testIdentifier/testValue" });
+    }
+
+    @BeforeClass
+    public static void initialize()
+    {
+        CASSANDRA_CONFIG.setString("cassandra-mtls.yaml");
+        SchemaLoader.loadSchema();
+        DatabaseDescriptor.daemonInitialization();
+        StorageService.instance.initServer(0);
+    }
+
+    @Before
+    public void before()
+    {
+        Config config = DatabaseDescriptor.getRawConfig();
+        config.server_encryption_options = 
config.server_encryption_options.withOutboundKeystore("test/conf/cassandra_ssl_test_outbound.keystore")
+                                                                           
.withOutboundKeystorePassword("cassandra");

Review Comment:
   Do these test stores have any expiry times? If so how would somebody 
regenerate them to fix tests.



##########
test/unit/org/apache/cassandra/config/ConfigCompatabilityTest.java:
##########
@@ -88,6 +88,14 @@
                                                      
.add("streaming_socket_timeout_in_ms") // CASSANDRA-12229
                                                      .build();
 
+    private static final Set<String> EXPECTED_ERRORS_403 = 
ImmutableSet.<String>builder()

Review Comment:
   should probably rename this `EXPECTED_BEFORE_50`?



##########
src/java/org/apache/cassandra/auth/MutualTlsInternodeAuthenticator.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.nio.file.Files;
+import java.nio.file.Paths; // checkstyle: permit this import
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/*
+ * Performs mTLS authentication for internode connections by extracting 
identities from outbound certificates
+ * configured for the cassandra instance.
+ *
+ * If the certificate identity is same as the identity present in the outbound 
keystore, the user is authorized
+ * Otherwise, the connection will fail. A bounce is not required when a new 
node is added or removed.
+ *
+ */
+public class MutualTlsInternodeAuthenticator implements IInternodeAuthenticator
+{
+    private static final String VALIDATOR_CLASS_NAME = "validator_class_name";
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 
30L, TimeUnit.SECONDS);
+    private final MutualTlsCertificateValidator certificateValidator;
+    private final List<String> validIdentities;
+
+    public MutualTlsInternodeAuthenticator(Map<String, String> parameters)
+    {
+        String certificateValidatorClassName = 
parameters.get(VALIDATOR_CLASS_NAME);
+        if (StringUtils.isEmpty(certificateValidatorClassName))
+        {
+            String message = 
"internode_authenticator.parameters.validator_class_name is not set";
+            logger.error(message);
+            throw new ConfigurationException(message);
+        }
+
+        certificateValidator = ParameterizedClass.newInstance(new 
ParameterizedClass(certificateValidatorClassName),
+                                                              
Arrays.asList("", AuthConfig.class.getPackage().getName()));
+        Config config = DatabaseDescriptor.getRawConfig();
+        checkInternodeMtlsConfigurationIsValid(config);
+        validIdentities = 
getIdentitiesFromKeyStore(config.server_encryption_options.outbound_keystore,
+                                                                  
config.server_encryption_options.outbound_keystore_password,
+                                                                  
config.server_encryption_options.store_type);
+
+        if (!validIdentities.isEmpty())
+        {
+            logger.info("Initializing internode authenticator with identities 
{}", validIdentities);
+        }
+        else
+        {
+            String message = String.format("No identity was extracted from the 
outbound keystore '%s'", config.server_encryption_options.outbound_keystore);
+            logger.info(message);
+            throw new ConfigurationException(message);
+        }
+    }
+
+    @Override
+    public boolean authenticate(InetAddress remoteAddress, int remotePort)
+    {
+        throw new UnsupportedOperationException("mTLS Authenticator only 
supports certificate based authenticate method");
+    }
+
+    @Override
+    public boolean authenticate(InetAddress remoteAddress, int remotePort, 
Certificate[] certificates, InternodeConnectionDirection connectionType)
+    {
+        return authenticateInternodeWithMtls(remoteAddress, remotePort, 
certificates, connectionType);
+    }
+
+
+    @Override
+    public void validateConfiguration() throws ConfigurationException
+    {
+
+    }
+
+    protected boolean authenticateInternodeWithMtls(InetAddress remoteAddress, 
int remotePort, Certificate[] certificates,
+                                                    
IInternodeAuthenticator.InternodeConnectionDirection connectionType)
+    {
+        if (connectionType == 
IInternodeAuthenticator.InternodeConnectionDirection.INBOUND)
+        {
+            String identity = certificateValidator.identity(certificates);
+            if (!certificateValidator.isValidCertificate(certificates))
+            {
+                noSpamLogger.error("Not a valid outbound certificate {}", 
identity);

Review Comment:
   should this be `"Not a valid inbound certificate {}"`?



##########
src/java/org/apache/cassandra/auth/MutualTlsAuthenticator.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.net.InetAddress;
+import java.security.cert.Certificate;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/*
+ * Performs mTLS authentication for client connections by extracting 
identities from client certificate
+ * and verifying them against the authorized identities in IdentityCache. 
IdentityCache is a loading cache that
+ * refreshes values on timely basis.
+ *
+ * During a client connection, after SSL handshake, identity of certificate is 
extracted using the certificate validator
+ * and is verified whether the value exists in the cache or not. If it exists 
access is granted, otherwise, the connection
+ * is rejected.
+ *
+ * Authenticator & Certificate validator can be configured using 
cassandra.yaml, one can write their own mTLS certificate
+ * validator and configure it in cassandra.yaml.Below is an example on how to 
configure validator.
+ * note that this example uses SPIFFE based validator, It could be any other 
validator with any defined identifier format.
+ *
+ * Example:
+ * authenticator:
+ *   class_name : org.apache.cassandra.auth.MutualTlsAuthenticator
+ *   parameters :
+ *     validator_class_name: 
org.apache.cassandra.auth.SpiffeCertificateValidator
+ */
+public class MutualTlsAuthenticator implements IAuthenticator
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(MutualTlsAuthenticator.class);
+    private static final NoSpamLogger nospamLogger = 
NoSpamLogger.getLogger(logger, 1L, TimeUnit.MINUTES);
+    private static final String VALIDATOR_CLASS_NAME = "validator_class_name";
+    private static final String CACHE_NAME = "IdentitiesCache";
+    private final IdentityCache identityCache = new IdentityCache();
+    private final MutualTlsCertificateValidator certificateValidator;
+
+    public MutualTlsAuthenticator(Map<String, String> parameters)
+    {
+        final String certificateValidatorClassName = 
parameters.get(VALIDATOR_CLASS_NAME);
+        if (StringUtils.isEmpty(certificateValidatorClassName))
+        {
+            String message ="authenticator.parameters.validator_class_name is 
not set";
+            logger.error(message);
+            throw new ConfigurationException(message);
+        }
+        certificateValidator = ParameterizedClass.newInstance(new 
ParameterizedClass(certificateValidatorClassName),
+                                                              
Arrays.asList("", AuthConfig.class.getPackage().getName()));
+        checkMtlsConfigurationIsValid(DatabaseDescriptor.getRawConfig());
+        AuthCacheService.instance.register(identityCache);
+    }
+
+    @Override
+    public boolean requireAuthentication()
+    {
+        return true;
+    }
+
+    @Override
+    public Set<? extends IResource> protectedResources()
+    {
+        return 
ImmutableSet.of(DataResource.table(SchemaConstants.AUTH_KEYSPACE_NAME, 
AuthKeyspace.ROLES));
+    }
+
+    @Override
+    public void validateConfiguration() throws ConfigurationException
+    {
+
+    }
+
+    @Override
+    public void setup()
+    {
+        identityCache.warm();
+    }
+
+    @Override
+    public SaslNegotiator newSaslNegotiator(InetAddress clientAddress)
+    {
+        return null;
+    }
+
+    @Override
+    public SaslNegotiator newSaslNegotiator(InetAddress clientAddress, 
Certificate[] certificates)
+    {
+        return new CertificateNegotiator(certificates);
+    }
+
+    @Override
+    public AuthenticatedUser legacyAuthenticate(Map<String, String> 
credentials) throws AuthenticationException
+    {
+        return null;
+    }
+
+    @VisibleForTesting
+    class CertificateNegotiator implements SaslNegotiator
+    {
+        private final Certificate[] clientCertificateChain;
+
+        private CertificateNegotiator(final Certificate[] 
clientCertificateChain)
+        {
+            this.clientCertificateChain = clientCertificateChain;
+        }
+
+        @Override
+        public byte[] evaluateResponse(byte[] clientResponse) throws 
AuthenticationException
+        {
+            return null;
+        }
+
+        @Override
+        public boolean isComplete()
+        {
+            return true;
+        }
+
+        @Override
+        public AuthenticatedUser getAuthenticatedUser() throws 
AuthenticationException
+        {
+            if 
(!certificateValidator.isValidCertificate(clientCertificateChain))
+            {
+                String message = "Invalid or not supported certificate";
+                nospamLogger.error(message);
+                throw new AuthenticationException(message);
+            }
+
+            final String identity = 
certificateValidator.identity(clientCertificateChain);
+            if (StringUtils.isEmpty(identity))
+            {
+                String msg = "Unable to extract client identity from 
certificate for authentication";
+                nospamLogger.error(msg);
+                throw new AuthenticationException(msg);
+            }
+            String role = identityCache.get(identity);
+            if (role == null)
+            {
+                String msg = "None of the users " + identity + " are 
authorized";
+                nospamLogger.error(msg);
+                throw new AuthenticationException(msg);

Review Comment:
   `Certificate identity '{}' not authorized` might also make a clearer message 
in logs and exceptions - as I think there's only one identity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to