This is an automated email from the ASF dual-hosted git repository.

djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f078c02cb5 Adding Mutual TLS authenticators for client & internode 
connections
f078c02cb5 is described below

commit f078c02cb58bddd735490b07548f7352f0eb09aa
Author: jkonisa <jkon...@apple.com>
AuthorDate: Mon May 22 22:15:22 2023 -0700

    Adding Mutual TLS authenticators for client & internode connections
    
    Patch by Jyothsna Konisa & Dinesh Joshi; reviewed by Yifan Cai, Jon 
Meredith,
    Yuki Morishita & Dinesh Joshi for CASSANDRA-18554
    
    Co-Authored-By: Dinesh Joshi <djo...@apache.org>
---
 conf/cassandra.yaml                                |  22 +-
 .../pages/developing/cql/cql_singlefile.adoc       |  51 +++++
 .../pages/getting-started/mtlsauthenticators.adoc  | 133 ++++++++++++
 src/antlr/Lexer.g                                  |   1 +
 src/antlr/Parser.g                                 |  31 +++
 .../apache/cassandra/audit/AuditLogEntryType.java  |   2 +
 src/java/org/apache/cassandra/auth/AuthConfig.java |  20 +-
 .../org/apache/cassandra/auth/AuthKeyspace.java    |  14 +-
 .../cassandra/auth/CassandraRoleManager.java       | 114 +++++++++++
 .../cassandra/auth/IInternodeAuthenticator.java    |   2 +-
 .../org/apache/cassandra/auth/IRoleManager.java    |  56 +++++
 .../cassandra/auth/MutualTlsAuthenticator.java     | 208 +++++++++++++++++++
 .../auth/MutualTlsCertificateValidator.java        |  63 ++++++
 .../auth/MutualTlsInternodeAuthenticator.java      | 226 +++++++++++++++++++++
 ...MutualTlsWithPasswordFallbackAuthenticator.java |  53 +++++
 .../cassandra/auth/PasswordAuthenticator.java      |   3 +-
 .../cassandra/auth/SpiffeCertificateValidator.java |  94 +++++++++
 src/java/org/apache/cassandra/config/Config.java   |   4 +-
 .../cassandra/config/DatabaseDescriptor.java       |   3 +-
 .../cassandra/config/ParameterizedClass.java       |  49 +++++
 .../cql3/statements/AddIdentityStatement.java      |  86 ++++++++
 .../cql3/statements/DropIdentityStatement.java     |  79 +++++++
 .../apache/cassandra/service/StorageService.java   |   2 +-
 .../cassandra-mtls-backward-compatibility.yaml     |  76 +++++++
 test/conf/cassandra-mtls.yaml                      |  89 ++++++++
 test/conf/cassandra_ssl_test.truststore            | Bin 5295 -> 7638 bytes
 test/conf/cassandra_ssl_test_outbound.keystore     | Bin 2286 -> 2375 bytes
 test/resources/auth/SampleInvalidCertificate.pem   |  21 ++
 .../resources/auth/SampleMtlsClientCertificate.pem |  29 +++
 .../SampleUnauthorizedMtlsClientCertificate.pem    |  29 +++
 .../cassandra/audit/AuditLoggerAuthTest.java       |   2 +-
 .../org/apache/cassandra/auth/AuthConfigTest.java  | 100 +++++++++
 .../org/apache/cassandra/auth/AuthTestUtils.java   |  46 +++++
 .../cassandra/auth/MutualTlsAuthenticatorTest.java | 181 +++++++++++++++++
 .../auth/MutualTlsInternodeAuthenticatorTest.java  | 188 +++++++++++++++++
 ...alTlsWithPasswordFallbackAuthenticatorTest.java |  93 +++++++++
 .../auth/SpiffeCertificateValidatorTest.java       |  58 ++++++
 .../cassandra/config/ConfigCompatabilityTest.java  |  17 +-
 .../config/YamlConfigurationLoaderTest.java        |  39 +++-
 .../cql3/statements/AddIdentityStatementTest.java  | 198 ++++++++++++++++++
 .../cql3/statements/DropIdentityStatementTest.java | 157 ++++++++++++++
 .../cassandra/transport/CQLUserAuditTest.java      |   2 +-
 42 files changed, 2618 insertions(+), 23 deletions(-)

diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 6414fab956..8783f53252 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -150,7 +150,14 @@ batchlog_replay_throttle: 1024KiB
 #   users. It keeps usernames and hashed passwords in system_auth.roles table.
 #   Please increase system_auth keyspace replication factor if you use this 
authenticator.
 #   If using PasswordAuthenticator, CassandraRoleManager must also be used 
(see below)
-authenticator: AllowAllAuthenticator
+authenticator:
+  class_name : org.apache.cassandra.auth.AllowAllAuthenticator
+# MutualTlsAuthenticator can be configured using the following configuration. 
One can add their own validator
+# which implements MutualTlsCertificateValidator class and provide logic for 
extracting identity out of certificates
+# and validating certificates.
+#  class_name : org.apache.cassandra.auth.MutualTlsAuthenticator
+#  parameters :
+#    validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
 
 # Authorization backend, implementing IAuthorizer; used to limit 
access/provide permissions
 # Out of the box, Cassandra provides 
org.apache.cassandra.auth.{AllowAllAuthorizer,
@@ -820,8 +827,17 @@ listen_address: localhost
 
 # Internode authentication backend, implementing IInternodeAuthenticator;
 # used to allow/disallow connections from peer nodes.
-# internode_authenticator: 
org.apache.cassandra.auth.AllowAllInternodeAuthenticator
-
+#internode_authenticator:
+#  class_name : org.apache.cassandra.auth.AllowAllInternodeAuthenticator
+#  parameters :
+# MutualTlsInternodeAuthenticator can be configured using the following 
configuration.One can add their own validator
+# which implements MutualTlsCertificateValidator class and provide logic for 
extracting identity out of certificates
+# and validating certificates.
+#  class_name : org.apache.cassandra.auth.MutualTlsInternodeAuthenticator
+#  parameters :
+#    validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
+#    trusted_peer_identities: "spiffe1,spiffe2"
+#    node_identity: "spiffe1"
 # Whether to start the native transport server.
 # The address on which the native transport is bound is defined by rpc_address.
 start_native_transport: true
diff --git a/doc/modules/cassandra/pages/developing/cql/cql_singlefile.adoc 
b/doc/modules/cassandra/pages/developing/cql/cql_singlefile.adoc
index 64e71b051c..286a9923c2 100644
--- a/doc/modules/cassandra/pages/developing/cql/cql_singlefile.adoc
+++ b/doc/modules/cassandra/pages/developing/cql/cql_singlefile.adoc
@@ -2246,6 +2246,57 @@ LIST ROLES;
 
 but only roles with the `LOGIN` privilege are included in the output.
 
+[[databaseIdentity]]
+=== Database Identities
+
+[[AddIdentityStmt]]
+==== ADD IDENTITY
+
+_Syntax:_
+
+bc(syntax).. +
+::= ADD IDENTITY ( IF NOT EXISTS )? TO ROLE ?
+
+_Sample:_
+
+bc(sample). +
+ADD IDENTITY 'id1' TO ROLE 'role1';
+
+Only a user with privileges to add roles can add identities
+
+Role names & Identity names should be quoted if they contain non-alphanumeric 
characters.
+
+[[addIdentityConditional]]
+===== Adding an identity conditionally
+
+Attempting to add an existing identity results in an invalid query
+condition unless the `IF NOT EXISTS` option is used. If the option is
+used and the identity exists, the statement is a no-op.
+
+bc(sample). +
+ADD IDENTITY IF NOT EXISTS 'id1' TO ROLE 'role1';
+
+[[dropIdentityStmt]]
+==== DROP IDENTITY
+
+_Syntax:_
+
+bc(syntax).. +
+::= DROP IDENTITY ( IF EXISTS )?  +
+p.
+
+_Sample:_
+
+bc(sample). +
+DROP IDENTITY 'testIdentity'; +
+DROP IDENTITY IF EXISTS 'testIdentity';
+
+Only a user with privileges to drop roles can remove identities
+
+Attempting to drop an Identity which does not exist results in an invalid
+query condition unless the `IF EXISTS` option is used. If the option is
+used and the identity does not exist the statement is a no-op.
+
 [[dataControl]]
 === Data Control
 
diff --git 
a/doc/modules/cassandra/pages/getting-started/mtlsauthenticators.adoc 
b/doc/modules/cassandra/pages/getting-started/mtlsauthenticators.adoc
new file mode 100644
index 0000000000..e3cd6ef79f
--- /dev/null
+++ b/doc/modules/cassandra/pages/getting-started/mtlsauthenticators.adoc
@@ -0,0 +1,133 @@
+= Getting started with mTLS authenticators
+
+When a certificate based authentication protocol like TLS is used for client 
and
+Internode connections, `MutualTlsAuthenticator` & 
`MutualTlsInternodeAuthenticator`
+can be used for the authentication by leveraging the client certificates from 
the
+SSL handshake.
+
+After SSL handshake, identity from the client certificates is extracted and 
only
+authorized users will be granted access.
+
+== What is an Identity
+
+Operators can define their own identity for certificates by extracting some 
fields or
+information from the certificates. Implementing the interface 
`MutualTlsCertificateValidator`
+supports validating & extracting identities from the certificates that can be 
used by
+`MutualTlsAuthenticator` and `MutualTlsInternodeAuthenticator` to customize 
for the
+certificate conventions used in the deployment environment.
+
+There is a default implementation of `MutualTlsCertificateValidator` with
+https://spiffe.io/docs/latest/spiffe-about/spiffe-concepts/[SPIFFE] as the 
identity
+of the certificates.This requires spiffe to be present in the SAN of the 
certificate.
+
+Instead of using `SPIFFE` based validator, a custom `CN` based validator that 
implements `MutualTlsCertificateValidator`
+could be configured by the operator if required.
+
+== Configuring mTLS authenticator for client connections
+
+Note that the following steps uses SPIFFE identity as an example, If you are 
using
+a custom validator, use appropriate identity in place of 
`spiffe://testdomain.com/testIdentifier/testValue`.
+
+*STEP 1: Add authorized users to system_auth.identity_to_roles table*
+
+Note that only users with permissions to create/modify roles can add/remove 
identities.
+Client certificates with the identities in this table will be trusted by C*.
+[source, plaintext]
+----
+ADD IDENTITY 'spiffe://testdomain.com/testIdentifier/testValue' TO ROLE 
'read_only_user'
+----
+
+*STEP 2: Configure Cassandra.yaml with right properties*
+
+`client_encryption_options` configuration for mTLS connections
+[source, plaintext]
+----
+client_encryption_options:
+  enabled: true
+  optional: false
+  keystore: conf/.keystore
+  keystore_password: cassandra
+  truststore: conf/.truststore
+  truststore_password: cassandra
+  require_client_auth: true // to enable mTLS
+----
+Configure mTLS authenticator and the validator for client connections . If you 
are
+implementing a custom validator, use that instead of Spiffe validator
+[source, plaintext]
+----
+authenticator:
+  class_name : org.apache.cassandra.auth.MutualTlsAuthenticator
+  parameters :
+    validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
+----
+
+*STEP 3: Bounce the cluster*
+
+After the bounce, C* will accept mTLS connections from the clients and if their
+identity is present in the `identity_to_roles` table, access will be granted.
+
+== Configuring mTLS authenticator for Internode connections
+
+Internode authenticator trusts certificates whose identities are present in
+`internode_authenticator.parameters.trusted_peer_identities` if configured.
+
+Otherwise, it trusts connections which have the same identity as the node.
+When a node is making an outbound connection to another node, it uses the
+certificate configured in `server_encryption_options.outbound_keystore`.
+During the start of the node, identity is extracted from the outbound keystore 
and
+connections from other nodes who have the same identity will be trusted if
+`trusted_peer_identities` is not configured.
+
+For example, if a node has `testIdentity` embedded in the certificate in
+outbound keystore, It trusts connections from other nodes when their 
certificates
+have `testIdentity` embedded in them.
+
+There is an optional configuration `node_identity` that can be used to verify 
identity
+extracted from the keystore to avoid any configuration errors.
+
+*STEP 1: Configure server_encryption_options in cassandra.yaml*
+
+[source, plaintext]
+----
+server_encryption_options:
+  internode_encryption: all
+  optional: true
+  keystore: conf/.keystore
+  keystore_password: cassandra
+  outbound_keystore: conf/.outbound_keystore
+  outbound_keystore_password: cassandra
+  require_client_auth: true  // for enabling mTLS
+  truststore: conf/.truststore
+  truststore_password: cassandra
+----
+
+*STEP 2: Configure Internode Authenticator and Validator*
+
+Configure mTLS Internode authenticator and validator. If you are
+implementing a custom validator, use that instead of Spiffe validator
+[source, plaintext]
+----
+internode_authenticator:
+  class_name : org.apache.cassandra.auth.MutualTlsInternodeAuthenticator
+  parameters :
+    validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
+    trusted_peer_identities : "spiffe1,spiffe2"
+----
+
+*STEP 3: Bounce the cluster*
+Once all nodes in the cluster are restarted, all internode communications will 
be authenticated by mTLS.
+
+== Migration from existing password based authentication
+* For client connections, since the migration will not happen overnight,
+the operators can run cassandra in optional mTLS mode and use
+`MutualTlsWithPasswordFallbackAuthenticator` which will accept both mTLS & 
password
+based connections, based on the type of connection client is making. These 
settings
+can be configured in `cassandra.yaml`. Once all the clients migrate to using 
mTLS,
+turn off optional mode and set the authenticator to be 
`MutualTlsAuthenticator`. From
+that point only mTLS client connections will be accepted.
+
+* For Internode connections, while doing rolling upgrades from non-mTLS based 
configuration
+to mTLS based configuration, set `server_encryption_options.optional:true` for 
the new nodes to
+be able to connect to old nodes which are still using non-mTLS based 
configuration during upgrade.
+After this, change the internode authenticator to be 
`MutualTlsInternodeAuthenticator` and turn off the optional
+mode by setting `server_encryption_options.optional:false`.
\ No newline at end of file
diff --git a/src/antlr/Lexer.g b/src/antlr/Lexer.g
index a4f8ea715f..c75523ed74 100644
--- a/src/antlr/Lexer.g
+++ b/src/antlr/Lexer.g
@@ -156,6 +156,7 @@ K_NOLOGIN:     N O L O G I N;
 K_OPTIONS:     O P T I O N S;
 K_ACCESS:      A C C E S S;
 K_DATACENTERS: D A T A C E N T E R S;
+K_IDENTITY:    I D E N T I T Y;
 
 K_CLUSTERING:  C L U S T E R I N G;
 K_ASCII:       A S C I I;
diff --git a/src/antlr/Parser.g b/src/antlr/Parser.g
index 65ed92a397..0d2150481d 100644
--- a/src/antlr/Parser.g
+++ b/src/antlr/Parser.g
@@ -247,6 +247,8 @@ cqlStatement returns [CQLStatement.Raw stmt]
     | st39=dropMaterializedViewStatement   { $stmt = st39; }
     | st40=alterMaterializedViewStatement  { $stmt = st40; }
     | st41=describeStatement               { $stmt = st41; }
+    | st42=addIdentityStatement            { $stmt = st42; }
+    | st43=dropIdentityStatement           { $stmt = st43; }
     ;
 
 /*
@@ -1237,6 +1239,28 @@ dropUserStatement returns [DropRoleStatement stmt]
     }
     : K_DROP K_USER (K_IF K_EXISTS { ifExists = true; })? u=username { 
name.setName($u.text, true); $stmt = new DropRoleStatement(name, ifExists); }
     ;
+/**
+ * ADD IDENTITY [IF NOT EXISTS] <identity> TO ROLE <role>
+ */
+addIdentityStatement returns [AddIdentityStatement stmt]
+    @init {
+        String identity = null;
+        String role = null;
+        boolean ifNotExists = false;
+    }
+    : K_ADD K_IDENTITY (K_IF K_NOT K_EXISTS { ifNotExists = true; })? 
u=identity { identity= $u.text; } K_TO K_ROLE r=identity { role=$r.text; $stmt 
= new AddIdentityStatement(identity, role, ifNotExists); }
+    ;
+
+/**
+ * DROP IDENTITY [IF EXISTS] <identity>
+ */
+ dropIdentityStatement returns [DropIdentityStatement stmt]
+      @init {
+          boolean ifExists = false;
+          String identity = null;
+      }
+      : K_DROP K_IDENTITY (K_IF K_EXISTS { ifExists = true; })? u=identity { 
identity= $u.text; $stmt = new DropIdentityStatement(identity, ifExists);}
+      ;
 
 /**
  * LIST USERS
@@ -1878,6 +1902,12 @@ username
     | QUOTED_NAME { addRecognitionError("Quoted strings are are not supported 
for user names and USER is deprecated, please use ROLE");}
     ;
 
+identity
+    : IDENT
+    | STRING_LITERAL
+    | QUOTED_NAME { addRecognitionError("Quoted strings are are not supported 
for identity");}
+    ;
+
 mbean
     : STRING_LITERAL
     ;
@@ -1923,6 +1953,7 @@ basic_unreserved_keyword returns [String str]
         | K_USERS
         | K_ROLE
         | K_ROLES
+        | K_IDENTITY
         | K_SUPERUSER
         | K_NOSUPERUSER
         | K_LOGIN
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntryType.java 
b/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
index ccf0169ff8..1055f875e0 100644
--- a/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
@@ -58,6 +58,8 @@ public enum AuditLogEntryType
     LIST_PERMISSIONS(AuditLogEntryCategory.DCL),
     ALTER_TYPE(AuditLogEntryCategory.DDL),
     CREATE_ROLE(AuditLogEntryCategory.DCL),
+    CREATE_IDENTITY(AuditLogEntryCategory.DCL),
+    DROP_IDENTITY(AuditLogEntryCategory.DCL),
     USE_KEYSPACE(AuditLogEntryCategory.OTHER),
     DESCRIBE(AuditLogEntryCategory.OTHER),
 
diff --git a/src/java/org/apache/cassandra/auth/AuthConfig.java 
b/src/java/org/apache/cassandra/auth/AuthConfig.java
index 9c5fceb6dc..5cecd6f130 100644
--- a/src/java/org/apache/cassandra/auth/AuthConfig.java
+++ b/src/java/org/apache/cassandra/auth/AuthConfig.java
@@ -18,11 +18,14 @@
 
 package org.apache.cassandra.auth;
 
+import java.util.Arrays;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -50,12 +53,15 @@ public final class AuthConfig
 
         /* Authentication, authorization and role management backend, 
implementing IAuthenticator, IAuthorizer & IRoleMapper*/
         if (conf.authenticator != null)
-            authenticator = FBUtilities.newAuthenticator(conf.authenticator);
+        {
+            authenticator = ParameterizedClass.newInstance(conf.authenticator,
+                                                           Arrays.asList("", 
AuthConfig.class.getPackage().getName()));
+        }
 
         // the configuration options regarding credentials caching are only 
guaranteed to
         // work with PasswordAuthenticator, so log a message if some other 
authenticator
         // is in use and non-default values are detected
-        if (!(authenticator instanceof PasswordAuthenticator)
+        if (!(authenticator instanceof PasswordAuthenticator || authenticator 
instanceof MutualTlsAuthenticator)
             && (conf.credentials_update_interval != null
                 || conf.credentials_validity.toMilliseconds() != 2000
                 || conf.credentials_cache_max_entries != 1000))
@@ -75,7 +81,7 @@ public final class AuthConfig
             authorizer = FBUtilities.newAuthorizer(conf.authorizer);
 
         if (!authenticator.requireAuthentication() && 
authorizer.requireAuthorization())
-            throw new ConfigurationException(conf.authenticator + " can't be 
used with " + conf.authorizer, false);
+            throw new ConfigurationException(conf.authenticator.class_name + " 
can't be used with " + conf.authorizer, false);
 
         DatabaseDescriptor.setAuthorizer(authorizer);
 
@@ -95,14 +101,18 @@ public final class AuthConfig
         // authenticator
 
         if (conf.internode_authenticator != null)
-            
DatabaseDescriptor.setInternodeAuthenticator(FBUtilities.construct(conf.internode_authenticator,
 "internode_authenticator"));
+        {
+            
DatabaseDescriptor.setInternodeAuthenticator(ParameterizedClass.newInstance(conf.internode_authenticator,
+                                                                               
         Arrays.asList("", AuthConfig.class.getPackage().getName())));
+        }
+
 
         // network authorizer
         INetworkAuthorizer networkAuthorizer = 
FBUtilities.newNetworkAuthorizer(conf.network_authorizer);
         DatabaseDescriptor.setNetworkAuthorizer(networkAuthorizer);
         if (networkAuthorizer.requireAuthorization() && 
!authenticator.requireAuthentication())
         {
-            throw new ConfigurationException(conf.network_authorizer + " can't 
be used with " + conf.authenticator, false);
+            throw new ConfigurationException(conf.network_authorizer + " can't 
be used with " + conf.authenticator.class_name, false);
         }
 
         // Validate at last to have authenticator, authorizer, role-manager 
and internode-auth setup
diff --git a/src/java/org/apache/cassandra/auth/AuthKeyspace.java 
b/src/java/org/apache/cassandra/auth/AuthKeyspace.java
index 75d9871d03..b1616e07fa 100644
--- a/src/java/org/apache/cassandra/auth/AuthKeyspace.java
+++ b/src/java/org/apache/cassandra/auth/AuthKeyspace.java
@@ -58,7 +58,8 @@ public final class AuthKeyspace
     public static final String ROLE_PERMISSIONS = "role_permissions";
     public static final String RESOURCE_ROLE_INDEX = 
"resource_role_permissons_index";
     public static final String NETWORK_PERMISSIONS = "network_permissions";
-    public static final Set<String> TABLE_NAMES = ImmutableSet.of(ROLES, 
ROLE_MEMBERS, ROLE_PERMISSIONS, RESOURCE_ROLE_INDEX, NETWORK_PERMISSIONS);
+    public static final String IDENTITY_TO_ROLES = "identity_to_role";
+    public static final Set<String> TABLE_NAMES = ImmutableSet.of(ROLES, 
ROLE_MEMBERS, ROLE_PERMISSIONS, RESOURCE_ROLE_INDEX, NETWORK_PERMISSIONS, 
IDENTITY_TO_ROLES);
 
     public static final long SUPERUSER_SETUP_DELAY = 
SUPERUSER_SETUP_DELAY_MS.getLong();
 
@@ -73,6 +74,15 @@ public final class AuthKeyspace
               + "member_of set<text>,"
               + "PRIMARY KEY(role))");
 
+    private static final TableMetadata IdentityToRoles =
+        parse(IDENTITY_TO_ROLES,
+              "mtls authorized identities lookup table",
+              "CREATE TABLE %s ("
+              + "identity text," // opaque identity string for use by role 
authenticators
+              + "role text,"
+              + "PRIMARY KEY(identity))"
+          );
+
     private static final TableMetadata RoleMembers =
         parse(ROLE_MEMBERS,
               "role memberships lookup table",
@@ -119,6 +129,6 @@ public final class AuthKeyspace
     {
         return KeyspaceMetadata.create(SchemaConstants.AUTH_KEYSPACE_NAME,
                                        
KeyspaceParams.simple(Math.max(DEFAULT_RF, 
DatabaseDescriptor.getDefaultKeyspaceRF())),
-                                       Tables.of(Roles, RoleMembers, 
RolePermissions, ResourceRoleIndex, NetworkPermissions));
+                                       Tables.of(Roles, RoleMembers, 
RolePermissions, ResourceRoleIndex, NetworkPermissions, IdentityToRoles));
     }
 }
diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java 
b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
index 37bda4eac4..3221c85184 100644
--- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.auth;
 
+import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
@@ -46,6 +47,7 @@ import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.NoSpamLogger;
 import org.mindrot.jbcrypt.BCrypt;
 
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.AUTH_BCRYPT_GENSALT_LOG2_ROUNDS;
@@ -80,6 +82,7 @@ import static 
org.apache.cassandra.utils.Clock.Global.nanoTime;
 public class CassandraRoleManager implements IRoleManager
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CassandraRoleManager.class);
+    private static final NoSpamLogger nospamLogger = 
NoSpamLogger.getLogger(logger, 1L, TimeUnit.MINUTES);
 
     public static final String DEFAULT_SUPERUSER_NAME = "cassandra";
     public static final String DEFAULT_SUPERUSER_PASSWORD = "cassandra";
@@ -125,6 +128,7 @@ public class CassandraRoleManager implements IRoleManager
     }
 
     private SelectStatement loadRoleStatement;
+    private SelectStatement loadIdentityStatement;
 
     private final Set<Option> supportedOptions;
     private final Set<Option> alterableOptions;
@@ -143,12 +147,72 @@ public class CassandraRoleManager implements IRoleManager
     public void setup()
     {
         loadRoleStatement();
+        loadIdentityStatement();
         scheduleSetupTask(() -> {
             setupDefaultRole();
             return null;
         });
     }
 
+    @Override
+    public String roleForIdentity(String identity)
+    {
+        QueryOptions options = 
QueryOptions.forInternalCalls(CassandraAuthorizer.authReadConsistencyLevel(),
+                                                             
Collections.singletonList(byteBuf(identity)));
+        ResultMessage.Rows rows = select(loadIdentityStatement, options);
+        if (rows.result.isEmpty())
+        {
+            nospamLogger.warn("No such identity {} in the identity_to_roles 
table", identity);
+            return null;
+        }
+        return UntypedResultSet.create(rows.result).one().getString("role");
+    }
+
+    @Override
+    public Map<String, String> authorizedIdentities()
+    {
+        Map<String, String> validIdentities = new HashMap<>();
+        String query = String.format("SELECT identity, role from %s.%s",
+                                     SchemaConstants.AUTH_KEYSPACE_NAME,
+                                     AuthKeyspace.IDENTITY_TO_ROLES);
+        UntypedResultSet rows = process(query, 
CassandraAuthorizer.authReadConsistencyLevel());
+        rows.forEach(row -> validIdentities.put(row.getString("identity"), 
row.getString("role")));
+        return validIdentities;
+    }
+
+    @Override
+    public void addIdentity(String identity, String role)
+    {
+        if (isExistingIdentity(identity))
+        {
+            throw new IllegalStateException("Identity is already associated 
with another role, cannot associate it with role " + role);
+        }
+
+        String query = String.format("INSERT INTO %s.%s (identity, role) 
VALUES (?, ?)",
+                                     SchemaConstants.AUTH_KEYSPACE_NAME,
+                                     AuthKeyspace.IDENTITY_TO_ROLES);
+        process(query, CassandraAuthorizer.authWriteConsistencyLevel(), 
byteBuf(identity), byteBuf(role));
+    }
+
+    @Override
+    public boolean isExistingIdentity(String identity)
+    {
+        String query = String.format("SELECT identity from %s.%s where 
identity=?",
+                                     SchemaConstants.AUTH_KEYSPACE_NAME,
+                                     AuthKeyspace.IDENTITY_TO_ROLES);
+        UntypedResultSet rows = process(query, 
CassandraAuthorizer.authReadConsistencyLevel(), byteBuf(identity));
+        return !rows.isEmpty();
+    }
+
+    @Override
+    public void dropIdentity(String identity)
+    {
+        String query = String.format("DELETE FROM %s.%s WHERE identity = ?",
+                                     SchemaConstants.AUTH_KEYSPACE_NAME,
+                                     AuthKeyspace.IDENTITY_TO_ROLES);
+        process(query, CassandraAuthorizer.authWriteConsistencyLevel(), 
byteBuf(identity));
+    }
+
     protected final void loadRoleStatement()
     {
         loadRoleStatement = (SelectStatement) prepare("SELECT * from %s.%s 
WHERE role = ?",
@@ -156,6 +220,14 @@ public class CassandraRoleManager implements IRoleManager
                                                       AuthKeyspace.ROLES);
     }
 
+
+    protected void loadIdentityStatement()
+    {
+        loadIdentityStatement = (SelectStatement) prepare("SELECT role from 
%s.%s where identity=?",
+                                                          
SchemaConstants.AUTH_KEYSPACE_NAME,
+                                                          
AuthKeyspace.IDENTITY_TO_ROLES);
+    }
+
     public Set<Option> supportedOptions()
     {
         return supportedOptions;
@@ -169,6 +241,11 @@ public class CassandraRoleManager implements IRoleManager
     public void createRole(AuthenticatedUser performer, RoleResource role, 
RoleOptions options)
     throws RequestValidationException, RequestExecutionException
     {
+        List<String> identitiesOfRole = identitiesForRole(role.getRoleName());
+        if (!identitiesOfRole.isEmpty())
+        {
+            throw new IllegalStateException(String.format("Cannot create a 
role '%s' when identities already exists for it", role.getRoleName()));
+        }
         String insertCql = options.getPassword().isPresent() || 
options.getHashedPassword().isPresent()
                          ? String.format("INSERT INTO %s.%s (role, 
is_superuser, can_login, salted_hash) VALUES ('%s', %s, %s, '%s')",
                                          SchemaConstants.AUTH_KEYSPACE_NAME,
@@ -194,6 +271,7 @@ public class CassandraRoleManager implements IRoleManager
                               escape(role.getRoleName())),
                 consistencyForRoleWrite(role.getRoleName()));
         removeAllMembers(role.getRoleName());
+        removeAllIdentitiesOfRole(role.getRoleName());
     }
 
     public void alterRole(AuthenticatedUser performer, RoleResource role, 
RoleOptions options)
@@ -470,6 +548,31 @@ public class CassandraRoleManager implements IRoleManager
                 consistencyForRoleWrite(grantee));
     }
 
+    private List<String> identitiesForRole(String role)
+    {
+        // Get all identities associated with a given role
+        String query = String.format("SELECT identity FROM %s.%s WHERE role = 
? ALLOW FILTERING",
+                                     SchemaConstants.AUTH_KEYSPACE_NAME,
+                                     AuthKeyspace.IDENTITY_TO_ROLES);
+        UntypedResultSet rows = process(query, consistencyForRoleRead(role), 
byteBuf(role));
+        List<String> identities = new ArrayList<>();
+        rows.forEach(row -> identities.add(row.getString("identity")));
+        return identities;
+    }
+
+    private void removeAllIdentitiesOfRole(String role)
+    {
+        List<String> identities = identitiesForRole(role);
+        String query = String.format("DELETE FROM %s.%s WHERE identity = ?",
+                                     SchemaConstants.AUTH_KEYSPACE_NAME,
+                                     AuthKeyspace.IDENTITY_TO_ROLES);
+        // Remove all the identities associated with the role from the table
+        for (String identity : identities)
+        {
+            process(query, consistencyForRoleWrite(role), byteBuf(identity));
+        }
+    }
+
     /*
      * Clear the membership list of the given role
      */
@@ -534,6 +637,11 @@ public class CassandraRoleManager implements IRoleManager
         return StringUtils.replace(name, "'", "''");
     }
 
+    private static ByteBuffer byteBuf(String str)
+    {
+        return UTF8Type.instance.decompose(str);
+    }
+
     /** Allows selective overriding of the consistency level for specific 
roles. */
     protected static ConsistencyLevel consistencyForRoleWrite(String role)
     {
@@ -561,6 +669,12 @@ public class CassandraRoleManager implements IRoleManager
         return QueryProcessor.process(query, consistencyLevel);
     }
 
+    UntypedResultSet process(String query, ConsistencyLevel consistencyLevel, 
ByteBuffer... values)
+    throws RequestValidationException, RequestExecutionException
+    {
+        return QueryProcessor.process(query, consistencyLevel, 
Arrays.asList(values));
+    }
+
     @VisibleForTesting
     ResultMessage.Rows select(SelectStatement statement, QueryOptions options)
     {
diff --git a/src/java/org/apache/cassandra/auth/IInternodeAuthenticator.java 
b/src/java/org/apache/cassandra/auth/IInternodeAuthenticator.java
index e5038c0944..d59320abf7 100644
--- a/src/java/org/apache/cassandra/auth/IInternodeAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/IInternodeAuthenticator.java
@@ -74,7 +74,7 @@ public interface IInternodeAuthenticator
     /**
      * Setup is called once upon system startup to initialize the 
IAuthenticator.
      *
-     * For example, use this method to create any required keyspaces/column 
families.
+     * For example, use this method to do any required initialization of the 
class.
      */
     default void setupInternode()
     {
diff --git a/src/java/org/apache/cassandra/auth/IRoleManager.java 
b/src/java/org/apache/cassandra/auth/IRoleManager.java
index 688d5bb6c4..460bf1a16f 100644
--- a/src/java/org/apache/cassandra/auth/IRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/IRoleManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.auth;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -226,4 +227,59 @@ public interface IRoleManager extends 
AuthCache.BulkLoader<RoleResource, Set<Rol
      * For example, use this method to create any required keyspaces/column 
families.
      */
     void setup();
+
+    /**
+     * Each valid identity is associated with a role in the identity_to_role 
table, this method returns role
+     * of a given identity
+     *
+     * @param identity identity whose role to be retrieved
+     * @return role of the given identity
+     */
+    default String roleForIdentity(String identity)
+    {
+        return null;
+    }
+
+    /**
+     * Returns all the authorized identities from the identity_to_role table
+     *
+     * @return Map of identity -> roles
+     */
+    default Map<String, String> authorizedIdentities()
+    {
+        return Collections.emptyMap();
+    }
+
+    /**
+     * Adds a row (identity, role) to the identity_to_role table
+     *
+     * @param identity identity to be added
+     * @param role role that is associated with the identity
+     */
+    default void addIdentity(String identity, String role)
+    {
+    }
+
+    /**
+     * Returns if an identity exists in the identity_to_role
+     *
+     * @param identity identity whose existence to verify
+     * @return
+     */
+    default boolean isExistingIdentity(String identity)
+    {
+        return false;
+    }
+
+    /**
+     * Called on the execution of DROP IDENTITY statement for removing a given 
identity from the identity_role table.
+     * This implies we want to revoke the access for the given identity.
+     *
+     * @param identity identity that has to be removed from the table
+     */
+    default void dropIdentity(String identity)
+    {
+
+    }
+
 }
diff --git a/src/java/org/apache/cassandra/auth/MutualTlsAuthenticator.java 
b/src/java/org/apache/cassandra/auth/MutualTlsAuthenticator.java
new file mode 100644
index 0000000000..c327a4bd21
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/MutualTlsAuthenticator.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.net.InetAddress;
+import java.security.cert.Certificate;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.helpers.MessageFormatter;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/*
+ * Performs mTLS authentication for client connections by extracting 
identities from client certificate
+ * and verifying them against the authorized identities in IdentityCache. 
IdentityCache is a loading cache that
+ * refreshes values on timely basis.
+ *
+ * During a client connection, after SSL handshake, identity of certificate is 
extracted using the certificate validator
+ * and is verified whether the value exists in the cache or not. If it exists 
access is granted, otherwise, the connection
+ * is rejected.
+ *
+ * Authenticator & Certificate validator can be configured using 
cassandra.yaml, one can write their own mTLS certificate
+ * validator and configure it in cassandra.yaml.Below is an example on how to 
configure validator.
+ * note that this example uses SPIFFE based validator, It could be any other 
validator with any defined identifier format.
+ *
+ * Example:
+ * authenticator:
+ *   class_name : org.apache.cassandra.auth.MutualTlsAuthenticator
+ *   parameters :
+ *     validator_class_name: 
org.apache.cassandra.auth.SpiffeCertificateValidator
+ */
+public class MutualTlsAuthenticator implements IAuthenticator
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(MutualTlsAuthenticator.class);
+    private static final NoSpamLogger nospamLogger = 
NoSpamLogger.getLogger(logger, 1L, TimeUnit.MINUTES);
+    private static final String VALIDATOR_CLASS_NAME = "validator_class_name";
+    private static final String CACHE_NAME = "IdentitiesCache";
+    private final IdentityCache identityCache = new IdentityCache();
+    private final MutualTlsCertificateValidator certificateValidator;
+
+    public MutualTlsAuthenticator(Map<String, String> parameters)
+    {
+        final String certificateValidatorClassName = 
parameters.get(VALIDATOR_CLASS_NAME);
+        if (StringUtils.isEmpty(certificateValidatorClassName))
+        {
+            String message ="authenticator.parameters.validator_class_name is 
not set";
+            logger.error(message);
+            throw new ConfigurationException(message);
+        }
+        certificateValidator = ParameterizedClass.newInstance(new 
ParameterizedClass(certificateValidatorClassName),
+                                                              
Arrays.asList("", AuthConfig.class.getPackage().getName()));
+        checkMtlsConfigurationIsValid(DatabaseDescriptor.getRawConfig());
+        AuthCacheService.instance.register(identityCache);
+    }
+
+    @Override
+    public boolean requireAuthentication()
+    {
+        return true;
+    }
+
+    @Override
+    public Set<? extends IResource> protectedResources()
+    {
+        return 
ImmutableSet.of(DataResource.table(SchemaConstants.AUTH_KEYSPACE_NAME, 
AuthKeyspace.ROLES));
+    }
+
+    @Override
+    public void validateConfiguration() throws ConfigurationException
+    {
+
+    }
+
+    @Override
+    public void setup()
+    {
+        identityCache.warm();
+    }
+
+    @Override
+    public SaslNegotiator newSaslNegotiator(InetAddress clientAddress)
+    {
+        return null;
+    }
+
+    @Override
+    public SaslNegotiator newSaslNegotiator(InetAddress clientAddress, 
Certificate[] certificates)
+    {
+        return new CertificateNegotiator(certificates);
+    }
+
+    @Override
+    public AuthenticatedUser legacyAuthenticate(Map<String, String> 
credentials) throws AuthenticationException
+    {
+        throw new AuthenticationException("mTLS authentication is not 
supported for CassandraLoginModule");
+    }
+
+    @VisibleForTesting
+    class CertificateNegotiator implements SaslNegotiator
+    {
+        private final Certificate[] clientCertificateChain;
+
+        private CertificateNegotiator(final Certificate[] 
clientCertificateChain)
+        {
+            this.clientCertificateChain = clientCertificateChain;
+        }
+
+        @Override
+        public byte[] evaluateResponse(byte[] clientResponse) throws 
AuthenticationException
+        {
+            return null;
+        }
+
+        @Override
+        public boolean isComplete()
+        {
+            return true;
+        }
+
+        @Override
+        public AuthenticatedUser getAuthenticatedUser() throws 
AuthenticationException
+        {
+            if 
(!certificateValidator.isValidCertificate(clientCertificateChain))
+            {
+                String message = "Invalid or not supported certificate";
+                nospamLogger.error(message);
+                throw new AuthenticationException(message);
+            }
+
+            final String identity = 
certificateValidator.identity(clientCertificateChain);
+            if (StringUtils.isEmpty(identity))
+            {
+                String msg = "Unable to extract client identity from 
certificate for authentication";
+                nospamLogger.error(msg);
+                throw new AuthenticationException(msg);
+            }
+            String role = identityCache.get(identity);
+            if (role == null)
+            {
+                String msg = "Certificate identity '{}' not authorized";
+                nospamLogger.error(msg, identity);
+                throw new AuthenticationException(MessageFormatter.format(msg, 
identity).getMessage());
+            }
+            return new AuthenticatedUser(role);
+        }
+    }
+
+    private void checkMtlsConfigurationIsValid(Config config)
+    {
+        if (!config.client_encryption_options.getEnabled() || 
!config.client_encryption_options.require_client_auth)
+        {
+            String msg = "MutualTlsAuthenticator requires 
client_encryption_options.enabled to be true" +
+                         " & client_encryption_options.require_client_auth to 
be true";
+            logger.error(msg);
+            throw new ConfigurationException(msg);
+        }
+    }
+
+    static class IdentityCache extends AuthCache<String, String>
+    {
+        IdentityCache()
+        {
+            super(CACHE_NAME,
+                  DatabaseDescriptor::setCredentialsValidity,
+                  DatabaseDescriptor::getCredentialsValidity,
+                  DatabaseDescriptor::setCredentialsUpdateInterval,
+                  DatabaseDescriptor::getCredentialsUpdateInterval,
+                  DatabaseDescriptor::setCredentialsCacheMaxEntries,
+                  DatabaseDescriptor::getCredentialsCacheMaxEntries,
+                  DatabaseDescriptor::setCredentialsCacheActiveUpdate,
+                  DatabaseDescriptor::getCredentialsCacheActiveUpdate,
+                  identity -> 
DatabaseDescriptor.getRoleManager().roleForIdentity(identity),
+                  () -> 
DatabaseDescriptor.getRoleManager().authorizedIdentities(),
+                  () -> true,
+                  (k, v) -> v == null);
+        }
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/auth/MutualTlsCertificateValidator.java 
b/src/java/org/apache/cassandra/auth/MutualTlsCertificateValidator.java
new file mode 100644
index 0000000000..e9735f48bf
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/MutualTlsCertificateValidator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.security.cert.Certificate;
+
+import org.apache.cassandra.exceptions.AuthenticationException;
+
+/**
+ * Interface for certificate validation and authorization for mTLS 
authenticators.
+ *
+ * This interface can be implemented to provide logic for extracting custom 
identities from client certificates
+ * to uniquely identify the certificates. It can also be used to provide 
custom authorization logic to authenticate
+ * clients using client certificates during mTLS connections.
+ */
+public interface MutualTlsCertificateValidator
+{
+    /**
+     * Perform any checks that are to be performed on the certificate before 
making authorization check to grant the
+     * access to the client during mTLS connection.
+     *
+     * For example
+     *  - Verifying CA information
+     *  - Checking CN information
+     *  - Validating Issuer information
+     *  - Checking organization information etc
+     *
+     * @param clientCertificateChain client certificate chain
+     * @return returns if the certificate is valid or not
+     */
+    boolean isValidCertificate(Certificate[] clientCertificateChain);
+
+    /**
+     * This method should provide logic to extract identity out of a 
certificate to perform mTLS authentication.
+     *
+     * An example of identity could be the following
+     *  - an identifier in SAN of the certificate like SPIFFE
+     *  - CN of the certificate
+     *  - any other fields in the certificate can be combined and be used as 
identifier of the certificate
+     *
+     * @param clientCertificateChain client certificate chain
+     * @return identifier extracted from certificate
+     * @throws AuthenticationException when identity cannot be extracted
+     */
+    String identity(Certificate[] clientCertificateChain) throws 
AuthenticationException;
+
+}
diff --git 
a/src/java/org/apache/cassandra/auth/MutualTlsInternodeAuthenticator.java 
b/src/java/org/apache/cassandra/auth/MutualTlsInternodeAuthenticator.java
new file mode 100644
index 0000000000..3c466df5fa
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/MutualTlsInternodeAuthenticator.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.nio.file.Files;
+import java.nio.file.Paths; // checkstyle: permit this import
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/*
+ * Performs mTLS authentication for internode connections by extracting 
identities from the certificates of incoming
+ * connection and verifying them against a list of authorized peers. 
Authorized peers can be configured in
+ * trusted_peer_identities in cassandra yaml, otherwise authenticator trusts 
connections from peers which has the same
+ * identity as the one that the node uses for making outbound connections.
+ *
+ * Optionally cassandra can validate the identity extracted from outbound 
keystore with node_identity that is configured
+ * in cassandra.yaml to avoid any configuration errors.
+ *
+ * Authenticator & Certificate validator can be configured using 
cassandra.yaml, operators can write their own mTLS
+ * certificate validator and configure it in cassandra.yaml.Below is an 
example on how to configure validator.
+ * Note that this example uses SPIFFE based validator, it could be any other 
validator with any defined identifier format.
+ *
+ * internode_authenticator:
+ *   class_name : org.apache.cassandra.auth.AllowAllInternodeAuthenticator
+ *   parameters :
+ *     validator_class_name: 
org.apache.cassandra.auth.SpiffeCertificateValidator
+ *     trusted_peer_identities: "spiffe1,spiffe2"
+ *     node_identity: "spiffe1"
+ */
+public class MutualTlsInternodeAuthenticator implements IInternodeAuthenticator
+{
+    private static final String VALIDATOR_CLASS_NAME = "validator_class_name";
+    private static final String TRUSTED_PEER_IDENTITIES = 
"trusted_peer_identities";
+    private static final String NODE_IDENTITY = "node_identity";
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 
30L, TimeUnit.SECONDS);
+    private final MutualTlsCertificateValidator certificateValidator;
+    private final List<String> trustedIdentities;
+
+    public MutualTlsInternodeAuthenticator(Map<String, String> parameters)
+    {
+        String certificateValidatorClassName = 
parameters.get(VALIDATOR_CLASS_NAME);
+        if (StringUtils.isEmpty(certificateValidatorClassName))
+        {
+            String message = 
"internode_authenticator.parameters.validator_class_name is not set";
+            logger.error(message);
+            throw new ConfigurationException(message);
+        }
+
+        certificateValidator = ParameterizedClass.newInstance(new 
ParameterizedClass(certificateValidatorClassName),
+                                                              
Arrays.asList("", AuthConfig.class.getPackage().getName()));
+        Config config = DatabaseDescriptor.getRawConfig();
+        checkInternodeMtlsConfigurationIsValid(config);
+
+        if (parameters.containsKey(TRUSTED_PEER_IDENTITIES))
+        {
+            // If trusted_peer_identities identities is configured in 
cassandra.yaml trust only those identities
+            trustedIdentities = 
Arrays.stream(parameters.get(TRUSTED_PEER_IDENTITIES).split(","))
+                                      .collect(Collectors.toList());
+        }
+        else
+        {
+            // Otherwise, trust the identities extracted from outbound 
keystore which is the identity that the node uses
+            // for making outbound connections.
+            trustedIdentities = 
getIdentitiesFromKeyStore(config.server_encryption_options.outbound_keystore,
+                                                          
config.server_encryption_options.outbound_keystore_password,
+                                                          
config.server_encryption_options.store_type);
+            // optionally, if node_identity is configured in the yaml, 
validate the identity extracted from outbound
+            // keystore to avoid any configuration errors
+            if(parameters.containsKey(NODE_IDENTITY))
+            {
+                String nodeIdentity = parameters.get(NODE_IDENTITY);
+                if(!trustedIdentities.contains(nodeIdentity))
+                {
+                    throw new ConfigurationException("Configured node identity 
is not matching identity extracted" +
+                                                     "from the keystore");
+                }
+                
trustedIdentities.retainAll(Collections.singleton(nodeIdentity));
+            }
+        }
+
+        if (!trustedIdentities.isEmpty())
+        {
+            logger.info("Initializing internode authenticator with identities 
{}", trustedIdentities);
+        }
+        else
+        {
+            String message = String.format("No identity was extracted from the 
outbound keystore '%s'", config.server_encryption_options.outbound_keystore);
+            logger.info(message);
+            throw new ConfigurationException(message);
+        }
+    }
+
+    @Override
+    public boolean authenticate(InetAddress remoteAddress, int remotePort)
+    {
+        throw new UnsupportedOperationException("mTLS Authenticator only 
supports certificate based authenticate method");
+    }
+
+    @Override
+    public boolean authenticate(InetAddress remoteAddress, int remotePort, 
Certificate[] certificates, InternodeConnectionDirection connectionType)
+    {
+        return authenticateInternodeWithMtls(remoteAddress, remotePort, 
certificates, connectionType);
+    }
+
+
+    @Override
+    public void validateConfiguration() throws ConfigurationException
+    {
+
+    }
+
+    protected boolean authenticateInternodeWithMtls(InetAddress remoteAddress, 
int remotePort, Certificate[] certificates,
+                                                    
IInternodeAuthenticator.InternodeConnectionDirection connectionType)
+    {
+        if (connectionType == 
IInternodeAuthenticator.InternodeConnectionDirection.INBOUND)
+        {
+            String identity = certificateValidator.identity(certificates);
+            if (!certificateValidator.isValidCertificate(certificates))
+            {
+                noSpamLogger.error("Not a valid certificate from {}:{} with 
identity '{}'", remoteAddress, remotePort, identity);
+                return false;
+            }
+
+            if(!trustedIdentities.contains(identity))
+            {
+                noSpamLogger.error("Unable to authenticate user {}", identity);
+                return false;
+            }
+            return true;
+        }
+        // Outbound connections don't need to be authenticated again in 
certificate based connections. SSL handshake
+        // makes sure that we are talking to valid server by checking root 
certificates of the server in the
+        // truststore of the client.
+        return true;
+    }
+
+    @VisibleForTesting
+    List<String> getIdentitiesFromKeyStore(final String outboundKeyStorePath,
+                                           final String 
outboundKeyStorePassword,
+                                           final String storeType)
+    {
+        final List<String> allUsers = new ArrayList<>();
+        try (InputStream ksf = 
Files.newInputStream(Paths.get(outboundKeyStorePath)))
+        {
+            final KeyStore ks = KeyStore.getInstance(storeType);
+            ks.load(ksf, outboundKeyStorePassword.toCharArray());
+            Enumeration<String> enumeration = ks.aliases();
+            while (enumeration.hasMoreElements())
+            {
+                String alias = enumeration.nextElement();
+                Certificate[] chain = ks.getCertificateChain(alias);
+                if (chain == null)
+                {
+                    logger.warn("Full chain/private key is not present in the 
keystore for certificate {}", alias);
+                    continue;
+                }
+                try
+                {
+                    allUsers.add(certificateValidator.identity(chain));
+                }
+                catch (AuthenticationException e)
+                {
+                    // When identity cannot be extracted, this exception is 
thrown
+                    // Ignore it, since only few certificates might contain 
identity
+                }
+            }
+        }
+        catch (Exception e)
+        {
+            logger.error("Failed to get identities from outbound_keystore {}", 
outboundKeyStorePath, e);
+        }
+        return allUsers;
+    }
+
+    private void checkInternodeMtlsConfigurationIsValid(Config config)
+    {
+        if (config.server_encryption_options.internode_encryption == 
EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none
+            || !config.server_encryption_options.require_client_auth)
+        {
+            String msg = "MutualTlsInternodeAuthenticator requires 
server_encryption_options.internode_encryption to be enabled" +
+                         " & server_encryption_options.require_client_auth to 
be true";
+            logger.error(msg);
+            throw new ConfigurationException(msg);
+        }
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/auth/MutualTlsWithPasswordFallbackAuthenticator.java
 
b/src/java/org/apache/cassandra/auth/MutualTlsWithPasswordFallbackAuthenticator.java
new file mode 100644
index 0000000000..eba0ea54c0
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/auth/MutualTlsWithPasswordFallbackAuthenticator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.net.InetAddress;
+import java.security.cert.Certificate;
+import java.util.Map;
+
+/**
+ * This authenticator can be used in optional mTLS mode, If the client doesn't 
make an mTLS connection
+ * this fallbacks to password authentication.
+ */
+public class MutualTlsWithPasswordFallbackAuthenticator extends 
PasswordAuthenticator
+{
+    private final MutualTlsAuthenticator mutualTlsAuthenticator;
+    public MutualTlsWithPasswordFallbackAuthenticator(Map<String, String> 
parameters)
+    {
+        mutualTlsAuthenticator = new MutualTlsAuthenticator(parameters);
+    }
+
+    @Override
+    public void setup()
+    {
+        super.setup();
+        mutualTlsAuthenticator.setup();
+    }
+
+    @Override
+    public SaslNegotiator newSaslNegotiator(InetAddress clientAddress, 
Certificate[] certificates)
+    {
+        if (certificates == null || certificates.length == 0)
+        {
+            return newSaslNegotiator(clientAddress);
+        }
+        return mutualTlsAuthenticator.newSaslNegotiator(clientAddress, 
certificates);
+    }
+}
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java 
b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 0ce96d80cf..e3046f6fc3 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -243,7 +243,8 @@ public class PasswordAuthenticator implements 
IAuthenticator, AuthCache.BulkLoad
         return (SelectStatement) QueryProcessor.getStatement(query, 
ClientState.forInternalCalls());
     }
 
-    private class PlainTextSaslAuthenticator implements SaslNegotiator
+    @VisibleForTesting
+    class PlainTextSaslAuthenticator implements SaslNegotiator
     {
         private boolean complete = false;
         private String username;
diff --git a/src/java/org/apache/cassandra/auth/SpiffeCertificateValidator.java 
b/src/java/org/apache/cassandra/auth/SpiffeCertificateValidator.java
new file mode 100644
index 0000000000..9260ce38a4
--- /dev/null
+++ b/src/java/org/apache/cassandra/auth/SpiffeCertificateValidator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.exceptions.AuthenticationException;
+
+/**
+ * This class assumes that the identity of a certificate is SPIFFE which is a 
URI that is present as part of the SAN
+ * of the client certificate. It has logic to extract identity (Spiffe) out of 
a certificate & knows how to validate
+ * the client certificates.
+ * <p>
+ *
+ * <p>
+ * Example:
+ * internode_authenticator:
+ * class_name : org.apache.cassandra.auth.MutualTlsAuthenticator
+ * parameters :
+ * validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
+ * authenticator:
+ * class_name : org.apache.cassandra.auth.MutualTlsInternodeAuthenticator
+ * parameters :
+ * validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
+ */
+public class SpiffeCertificateValidator implements 
MutualTlsCertificateValidator
+{
+    @Override
+    public boolean isValidCertificate(Certificate[] clientCertificateChain)
+    {
+        return true;
+    }
+
+    @Override
+    public String identity(Certificate[] clientCertificateChain) throws 
AuthenticationException
+    {
+        // returns spiffe
+        try
+        {
+            return getSANSpiffe(clientCertificateChain);
+        }
+        catch (CertificateException e)
+        {
+            throw new AuthenticationException(e.getMessage(), e);
+        }
+    }
+
+    private static String getSANSpiffe(final Certificate[] clientCertificates) 
throws CertificateException
+    {
+        int URI_TYPE = 6;
+        X509Certificate[] castedCerts = castCertsToX509(clientCertificates);
+        Collection<List<?>> subjectAltNames = 
castedCerts[0].getSubjectAlternativeNames();
+
+        if (subjectAltNames != null)
+        {
+            for (List<?> item : subjectAltNames)
+            {
+                Integer type = (Integer) item.get(0);
+                String spiffe = (String) item.get(1);
+                if (type == URI_TYPE && spiffe.startsWith("spiffe://"))
+                {  // Spiffe is a URI
+                    return spiffe;
+                }
+            }
+        }
+        throw new CertificateException("Unable to extract Spiffe from the 
certificate");
+    }
+
+    private static X509Certificate[] castCertsToX509(Certificate[] 
clientCertificateChain)
+    {
+        return Arrays.asList(clientCertificateChain).toArray(new 
X509Certificate[0]);
+    }
+}
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index e254e54e05..5e904bbf3e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -77,7 +77,7 @@ public class Config
     public static final String PROPERTY_PREFIX = "cassandra.";
 
     public String cluster_name = "Test Cluster";
-    public String authenticator;
+    public ParameterizedClass authenticator;
     public String authorizer;
     public String role_manager;
     public ParameterizedClass crypto_provider;
@@ -213,7 +213,7 @@ public class Config
     public boolean listen_interface_prefer_ipv6 = false;
     public String broadcast_address;
     public boolean listen_on_broadcast_address = false;
-    public String internode_authenticator;
+    public ParameterizedClass internode_authenticator;
 
     public boolean traverse_auth_from_root = false;
 
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 558f4a349b..88c0ab9e7d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -420,7 +420,8 @@ public class DatabaseDescriptor
         }
     }
 
-    private static void setConfig(Config config)
+    @VisibleForTesting
+    public static void setConfig(Config config)
     {
         conf = config;
     }
diff --git a/src/java/org/apache/cassandra/config/ParameterizedClass.java 
b/src/java/org/apache/cassandra/config/ParameterizedClass.java
index 9b001786e3..2d0390ef44 100644
--- a/src/java/org/apache/cassandra/config/ParameterizedClass.java
+++ b/src/java/org/apache/cassandra/config/ParameterizedClass.java
@@ -17,11 +17,14 @@
  */
 package org.apache.cassandra.config;
 
+import java.lang.reflect.Constructor;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Objects;
 
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.Shared;
 
 import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
@@ -40,6 +43,12 @@ public class ParameterizedClass
         // for snakeyaml
     }
 
+    public ParameterizedClass(String class_name)
+    {
+        this.class_name = class_name;
+        this.parameters = Collections.emptyMap();
+    }
+
     public ParameterizedClass(String class_name, Map<String, String> 
parameters)
     {
         this.class_name = class_name;
@@ -53,6 +62,46 @@ public class ParameterizedClass
              p.containsKey(PARAMETERS) ? (Map<String, 
String>)((List<?>)p.get(PARAMETERS)).get(0) : null);
     }
 
+    static public <K> K newInstance(ParameterizedClass parameterizedClass, 
List<String> searchPackages)
+    {
+        Exception last = null;
+        if (searchPackages == null || searchPackages.isEmpty())
+            searchPackages = Collections.singletonList("");
+        for (String searchPackage : searchPackages)
+        {
+            try
+            {
+                if (!searchPackage.isEmpty() && !searchPackage.endsWith("."))
+                    searchPackage = searchPackage + '.';
+                String name = searchPackage + parameterizedClass.class_name;
+                Class<?> providerClass = Class.forName(name);
+                try
+                {
+                    Constructor<?> constructor = 
providerClass.getConstructor(Map.class);
+                    K instance = (K) 
constructor.newInstance(parameterizedClass.parameters);
+                    return instance;
+                }
+                catch (Exception constructorEx)
+                {
+                    //no-op
+                }
+                // fallback to no arg constructor if no params present
+                if (parameterizedClass.parameters == null || 
parameterizedClass.parameters.isEmpty())
+                {
+                    Constructor<?> constructor = 
providerClass.getConstructor();
+                    K instance = (K) constructor.newInstance();
+                    return instance;
+                }
+            }
+            // there are about 5 checked exceptions that could be thrown here.
+            catch (Exception e)
+            {
+                last = e;
+            }
+        }
+        throw new ConfigurationException("Unable to create parameterized class 
" + parameterizedClass.class_name, last);
+    }
+
     @Override
     public boolean equals(Object that)
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/AddIdentityStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/AddIdentityStatement.java
new file mode 100644
index 0000000000..b11801d4ea
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/AddIdentityStatement.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.auth.RoleResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+/**
+ * Cqlsh statement to add identity into roles_to_identity table for storing 
authorized identities for mTLS connections.
+ * Performs some checks before adding the identity to roles table.
+ *
+ * EX: ADD IDENTITY 'testIdentity' TO ROLE 'testRole'
+ */
+public class AddIdentityStatement extends AuthenticationStatement
+{
+    final String identity;
+    final String role;
+    final boolean ifNotExists;
+
+    public AddIdentityStatement(String identity, String role, boolean 
ifNotExists)
+    {
+        this.role = role;
+        this.identity = identity;
+        this.ifNotExists = ifNotExists;
+    }
+
+    @Override
+    public void authorize(ClientState state)
+    {
+        checkPermission(state, Permission.CREATE, 
state.getUser().getPrimaryRole());
+    }
+
+    @Override
+    public void validate(ClientState state)
+    {
+        state.ensureNotAnonymous();
+
+        if 
(!DatabaseDescriptor.getRoleManager().isExistingRole(RoleResource.role(role)))
+        {
+            throw new InvalidRequestException(String.format("Can not add 
identity for non-existent role '%s'", role));
+        }
+
+        if (!ifNotExists && 
DatabaseDescriptor.getRoleManager().isExistingIdentity(identity))
+            throw new InvalidRequestException(String.format("%s already 
exists", identity));
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.CREATE_IDENTITY);
+    }
+
+    @Override
+    public ResultMessage execute(ClientState state) throws 
RequestExecutionException, RequestValidationException
+    {
+        if(!ifNotExists || 
!DatabaseDescriptor.getRoleManager().isExistingIdentity(identity))
+        {
+            DatabaseDescriptor.getRoleManager().addIdentity(identity, role);
+        }
+        return null;
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/DropIdentityStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/DropIdentityStatement.java
new file mode 100644
index 0000000000..732f28a489
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/DropIdentityStatement.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.apache.cassandra.audit.AuditLogContext;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+/**
+ * Cqlsh statement to remove identity from identity_to_roles table.
+ * Ex: DROP IDENTITY 'testIdentity'
+ */
+public class DropIdentityStatement extends AuthenticationStatement
+{
+    final String identity;
+    final boolean ifExists;
+
+    public DropIdentityStatement(String identity, boolean ifExists)
+    {
+        this.identity = identity;
+        this.ifExists = ifExists;
+    }
+
+    @Override
+    public void authorize(ClientState state)
+    {
+        checkPermission(state, Permission.DROP, 
state.getUser().getPrimaryRole());
+    }
+
+    @Override
+    public void validate(ClientState state)
+    {
+        state.ensureNotAnonymous();
+
+        if (!ifExists && 
!DatabaseDescriptor.getRoleManager().isExistingIdentity(identity))
+        {
+            throw new InvalidRequestException(String.format("identity '%s' 
doesn't exist", identity));
+        }
+    }
+
+    @Override
+    public AuditLogContext getAuditLogContext()
+    {
+        return new AuditLogContext(AuditLogEntryType.DROP_IDENTITY);
+    }
+
+    @Override
+    public ResultMessage execute(ClientState state) throws 
RequestExecutionException, RequestValidationException
+    {
+        // not rejected in validate()
+        if(!ifExists || 
DatabaseDescriptor.getRoleManager().isExistingIdentity(identity))
+        {
+            DatabaseDescriptor.getRoleManager().dropIdentity(identity);
+        }
+        return null;
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index d4ada4e2a2..c3c5fbba09 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1092,6 +1092,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             if (DatabaseDescriptor.getReplaceTokens().size() > 0 || 
DatabaseDescriptor.getReplaceNode() != null)
                 throw new RuntimeException("Replace method removed; use " + 
REPLACE_ADDRESS.getKey() + " system property instead.");
 
+            DatabaseDescriptor.getInternodeAuthenticator().setupInternode();
             MessagingService.instance().listen();
 
             UUID localHostId = SystemKeyspace.getOrInitializeLocalHostId();
@@ -1371,7 +1372,6 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
             DatabaseDescriptor.getRoleManager().setup();
             DatabaseDescriptor.getAuthenticator().setup();
-            DatabaseDescriptor.getInternodeAuthenticator().setupInternode();
             DatabaseDescriptor.getAuthorizer().setup();
             DatabaseDescriptor.getNetworkAuthorizer().setup();
             AuthCacheService.initializeAndRegisterCaches();
diff --git a/test/conf/cassandra-mtls-backward-compatibility.yaml 
b/test/conf/cassandra-mtls-backward-compatibility.yaml
new file mode 100644
index 0000000000..fe1b8ea326
--- /dev/null
+++ b/test/conf/cassandra-mtls-backward-compatibility.yaml
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# This file is used for testing mTLS authenticators
+#
+cluster_name: Test Cluster
+memtable_allocation_type: offheap_objects
+commitlog_sync: batch
+commitlog_sync_batch_window_in_ms: 1.0
+commitlog_segment_size: 5MiB
+commitlog_directory: build/test/cassandra/commitlog
+cdc_raw_directory: build/test/cassandra/cdc_raw
+cdc_enabled: false
+hints_directory: build/test/cassandra/hints
+partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
+listen_address: 127.0.0.1
+storage_port: 7012
+ssl_storage_port: 17012
+start_native_transport: true
+native_transport_port: 9042
+column_index_size: 4KiB
+saved_caches_directory: build/test/cassandra/saved_caches
+data_file_directories:
+  - build/test/cassandra/data
+disk_access_mode: mmap
+seed_provider:
+  - class_name: org.apache.cassandra.locator.SimpleSeedProvider
+    parameters:
+      - seeds: "127.0.0.1:7012"
+endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+dynamic_snitch: true
+incremental_backups: true
+concurrent_compactors: 4
+compaction_throughput: 0MiB/s
+row_cache_class_name: org.apache.cassandra.cache.OHCProvider
+row_cache_size: 16MiB
+user_defined_functions_enabled: true
+scripted_user_defined_functions_enabled: true
+prepared_statements_cache_size: 1MiB
+corrupted_tombstone_strategy: exception
+stream_entire_sstables: true
+stream_throughput_outbound: 23841858MiB/s
+sasi_indexes_enabled: true
+materialized_views_enabled: true
+drop_compact_storage_enabled: true
+file_cache_enabled: true
+auto_hints_cleanup_enabled: true
+default_keyspace_rf: 1
+
+server_encryption_options:
+  internode_encryption: none
+  keystore: test/conf/cassandra_ssl_test.keystore
+  keystore_password: cassandra
+  outbound_keystore: test/conf/cassandra_ssl_test_outbound.keystore
+  outbound_keystore_password: cassandra
+  truststore: test/conf/cassandra_ssl_test.truststore
+  truststore_password: cassandra
+  require_client_auth: true
+internode_authenticator: 
org.apache.cassandra.auth.AllowAllInternodeAuthenticator
+authenticator: org.apache.cassandra.auth.AllowAllAuthenticator
diff --git a/test/conf/cassandra-mtls.yaml b/test/conf/cassandra-mtls.yaml
new file mode 100644
index 0000000000..924f55cedf
--- /dev/null
+++ b/test/conf/cassandra-mtls.yaml
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# This file is used for testing mTLS authenticators
+#
+cluster_name: Test Cluster
+memtable_allocation_type: offheap_objects
+commitlog_sync: batch
+commitlog_sync_batch_window_in_ms: 1.0
+commitlog_segment_size: 5MiB
+commitlog_directory: build/test/cassandra/commitlog
+cdc_raw_directory: build/test/cassandra/cdc_raw
+cdc_enabled: false
+hints_directory: build/test/cassandra/hints
+partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
+listen_address: 127.0.0.1
+storage_port: 7012
+ssl_storage_port: 17012
+start_native_transport: true
+native_transport_port: 9042
+column_index_size: 4KiB
+saved_caches_directory: build/test/cassandra/saved_caches
+data_file_directories:
+  - build/test/cassandra/data
+disk_access_mode: mmap
+seed_provider:
+  - class_name: org.apache.cassandra.locator.SimpleSeedProvider
+    parameters:
+      - seeds: "127.0.0.1:7012"
+endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+dynamic_snitch: true
+incremental_backups: true
+concurrent_compactors: 4
+compaction_throughput: 0MiB/s
+row_cache_class_name: org.apache.cassandra.cache.OHCProvider
+row_cache_size: 16MiB
+prepared_statements_cache_size: 1MiB
+corrupted_tombstone_strategy: exception
+stream_entire_sstables: true
+stream_throughput_outbound: 23841858MiB/s
+sasi_indexes_enabled: true
+materialized_views_enabled: true
+drop_compact_storage_enabled: true
+file_cache_enabled: true
+auto_hints_cleanup_enabled: true
+default_keyspace_rf: 1
+
+client_encryption_options:
+  enabled: true
+  require_client_auth: true
+  keystore: test/conf/cassandra_ssl_test.keystore
+  keystore_password: cassandra
+  truststore: test/conf/cassandra_ssl_test.truststore
+  truststore_password: cassandra
+
+server_encryption_options:
+  internode_encryption: all
+  enabled: true
+  keystore: test/conf/cassandra_ssl_test.keystore
+  keystore_password: cassandra
+  outbound_keystore: test/conf/cassandra_ssl_test_outbound.keystore
+  outbound_keystore_password: cassandra
+  truststore: test/conf/cassandra_ssl_test.truststore
+  truststore_password: cassandra
+  require_client_auth: true
+internode_authenticator:
+  class_name : org.apache.cassandra.auth.MutualTlsInternodeAuthenticator
+  parameters :
+    validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
+authenticator:
+  class_name : org.apache.cassandra.auth.MutualTlsAuthenticator
+  parameters :
+    validator_class_name: org.apache.cassandra.auth.SpiffeCertificateValidator
diff --git a/test/conf/cassandra_ssl_test.truststore 
b/test/conf/cassandra_ssl_test.truststore
index ab01af30cd..10abf12f53 100644
Binary files a/test/conf/cassandra_ssl_test.truststore and 
b/test/conf/cassandra_ssl_test.truststore differ
diff --git a/test/conf/cassandra_ssl_test_outbound.keystore 
b/test/conf/cassandra_ssl_test_outbound.keystore
index 7dbf466e5a..fc5d117b04 100644
Binary files a/test/conf/cassandra_ssl_test_outbound.keystore and 
b/test/conf/cassandra_ssl_test_outbound.keystore differ
diff --git a/test/resources/auth/SampleInvalidCertificate.pem 
b/test/resources/auth/SampleInvalidCertificate.pem
new file mode 100644
index 0000000000..dce4a10de4
--- /dev/null
+++ b/test/resources/auth/SampleInvalidCertificate.pem
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDejCCAmICCQCHse88ehxeVjANBgkqhkiG9w0BAQsFADB/MQswCQYDVQQGEwJ1
+czELMAkGA1UECAwCY2ExEjAQBgNVBAcMCXN1bm55dmFsZTEOMAwGA1UECgwFYXBw
+bGUxDDAKBgNVBAsMA3BpZTESMBAGA1UEAwwJbG9jYWxob3N0MR0wGwYJKoZIhvcN
+AQkBFg50ZXN0QGFwcGxlLmNvbTAeFw0xOTAzMjEyMTI3NDNaFw0yMDAzMjAyMTI3
+NDNaMH8xCzAJBgNVBAYTAnVzMQswCQYDVQQIDAJjYTESMBAGA1UEBwwJc3Vubnl2
+YWxlMQ4wDAYDVQQKDAVhcHBsZTEMMAoGA1UECwwDcGllMRIwEAYDVQQDDAlsb2Nh
+bGhvc3QxHTAbBgkqhkiG9w0BCQEWDnRlc3RAYXBwbGUuY29tMIIBIjANBgkqhkiG
+9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxOYvntrvB41GiOLP0U4vv/dRdotnX3bgXDG4
+na5SW4X7x8BbT6Kg2FVlgsEMk/26id/isAXuo8uWmyAwL0r522No5pXBlc6QzQr4
+VR8xAnLkBzuIH4R0eVOGOpf/k4IKem1dQjRap5SCKcMm8oVwGTwiDZNNCSb8uBSX
+opnINMa2GjWGoe1WLGBVaHm2gHMpcoyXIOgp1bIu6UohII713DXw1A3OfICa0JIE
+xYhfRr9wTNreIWc6lLEQkdFe7qTo/nn1bGJkkV/pcxw3GwB3arh+B2JtD3y+Tvw1
+EyLThaCkr5DoH7M4Rt1/plyJZ4sdEO47bpHhqVrkO7oKj0eY7QIDAQABMA0GCSqG
+SIb3DQEBCwUAA4IBAQCcdisPpyWK4YN/QEs5ao3o3cNO7XjBzndghsQqqPtZYh6t
+MHHEk9uR4lw6VChKE4KFquR3ukHTsR913i+EjdTXpstj1HenzlLsPG3D7C4CQRpT
+D7s9lgpF2W40y7ZlX07yRzZPJFZX4aApEQvALTIwloP7+EZVAkRDOxIX7CogJU0/
+ngiF5Scaxr61G4cuNLhxbJaw7OdYp/a/6Qh2hkHmrCSs8MfOozCN79HeAXjit/vZ
+eBMxx+8nPPjP7Um0GO2ESkWYvjlM5P/WjaySHbzwpUkVvBWKmY/lfgyKcR9pwFQy
+pwaGg9+XcoVrpW5RHjp/ue6xIxqB4T0RoY9kDDIm
+-----END CERTIFICATE-----
diff --git a/test/resources/auth/SampleMtlsClientCertificate.pem 
b/test/resources/auth/SampleMtlsClientCertificate.pem
new file mode 100644
index 0000000000..ae02ba5929
--- /dev/null
+++ b/test/resources/auth/SampleMtlsClientCertificate.pem
@@ -0,0 +1,29 @@
+-----BEGIN CERTIFICATE-----
+MIIFDDCCAvSgAwIBAgIJAI/Ret4ZZJ5sMA0GCSqGSIb3DQEBCwUAMCUxIzAhBgNV
+BAMMGm10bHMtYXV0aGVudGljYXRvci10ZXN0aW5nMB4XDTIzMDEwMzIyNTU1OVoX
+DTMyMTIzMTIyNTU1OVowJTEjMCEGA1UEAwwabXRscy1hdXRoZW50aWNhdG9yLXRl
+c3RpbmcwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQCzhMGvT9Sn2SXQ
+fSbacWj2E20RoZhnrBdzOX+CHX1Jx4juNjGIfJPPCwPJ4I2RvSou7Bw+QiwnWmy9
+8gkU1FK928VzzH5Gmw99LN8iqNCUdltylLMkr2dN2kgX0K2DC04ETK1Vosd1X6+a
+6QescH10I3zzEuw9KQg/CzvzqjeBgdWyXw3e8MPctMcM6S5TPiI/BqGEJNz3a0DV
+dFkXQL0y7ya7gqkRk+m4sK90NNcJk67b3Vbo8+FWmigOLoGYXP4HyoIHZVicpEp/
+a9YmcAQ6azabQdBzFWxGkAkijwtnVLYQtGMqiKZkbEstbzaIlCL+F3t3P+S64A+i
+dmo60pXkYaB0JQ8Kg8gI5yNy8IvKqUuzVJlGf5qCcoAV5udROi5GRLsQd904sWwq
+jQWSVR4HTy94Nrf0yUemACtwe+5XLSmW2rQaRTWVMh/vpd2EhMmjZ72FVBCk6pzk
+ZIKQJO1A6baU/At+8BV/LWopGxcIUn3Lzffx+xLVevNr6LuunKGQkt9XASH7pYV5
+KBt9gR6fAmp68n+Y7EE/2+9Rp7X6CNUec+4z+xGSZqfGGNB/KjpRyppN64oMf2xt
+gRzihN71ZQd/+Sc9nYqVcN5ps6SrnrKKoSIzZCuJQv8hQd4z4K8vlWj1cq7kLNXL
+JMhBLQSpUyHYtRInSJlgFpq+XplsQQIDAQABoz8wPTA7BgNVHREENDAyhjBzcGlm
+ZmU6Ly90ZXN0ZG9tYWluLmNvbS90ZXN0SWRlbnRpZmllci90ZXN0VmFsdWUwDQYJ
+KoZIhvcNAQELBQADggIBAKA+PEOS2ZH/tu5x4q+v28uFxo472itfgOlVWZrpQX6g
+B4HbPRv6lBZW/ko+jlJspIPiDmYTRFZQ4WKG0+sW0yW2k+sUQ1Ee+Aql6M0Gqi6P
+OdOmTai5+OfpcLLg0ANNohf9BKeavpqaTauNG1adnaIHKxcZjBJ305eOZvr2sgKz
+LtsqmnO54ADbBFsS+RIR4hacLs/P8gWr1j3Tujv37vddE3l76urGEIHx8x9gJydG
+++/mcrq6Oom6Npph5vWRLCU15WDWnIvttKcBNW7LcvaiaXQxX76gZvIkzLZr3w5z
+SZE1VI+rXZkluvqHIGSNxQGAcLwkDOimymFDUONZu7GMof6yi9zEHVR8kNOmb8Xe
+Nwr7DkfAuDvTzhau+/yNvcYW4LW/5aerw+1hQoD15HWAtjylbzXAAvsPX3VkfLJS
+p6iOMuZJ+p1veys1bMteVu95s9bD/aMicSV79n3QF7zfnsR1SowR3txPsgLOwxdP
+F/JfpUXG7OCzFM1WxUrUQR+VQ6YX4yM8m+EttXF5gXOzzBzb0LBrVw20VIlrvZXB
+vx4LWo9A5J2Gp/0IhwCrAU68Wjok9gP5HQ9/pSbFXDiffY+O2RflqdWJIfmdos4b
+JlxAXwXN/6yQp/Rj9Jz+rn2I8TmVMw6tFS5psh9nvXA7KLoFYoBBOsbL6P7HtEK0
+-----END CERTIFICATE-----
\ No newline at end of file
diff --git a/test/resources/auth/SampleUnauthorizedMtlsClientCertificate.pem 
b/test/resources/auth/SampleUnauthorizedMtlsClientCertificate.pem
new file mode 100644
index 0000000000..34f682ad4c
--- /dev/null
+++ b/test/resources/auth/SampleUnauthorizedMtlsClientCertificate.pem
@@ -0,0 +1,29 @@
+-----BEGIN CERTIFICATE-----
+MIIFCTCCAvGgAwIBAgIJAKp41MPbHc4yMA0GCSqGSIb3DQEBCwUAMCAxHjAcBgNV
+BAMMFWF1dGhlbnRpY2F0b3ItdGVzdGluZzAeFw0yMzAyMDcyMTMwMzZaFw0zMzAy
+MDQyMTMwMzZaMCAxHjAcBgNVBAMMFWF1dGhlbnRpY2F0b3ItdGVzdGluZzCCAiIw
+DQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBAMYQ1POS39c4RyOFH9fjRGjlHV5i
+5eit5+HY5Y2ID4mSwY1EBEcWvZvJSeS0B1M/tj3DNNcVsYm/Cf1EnIKxzdonIN97
+LZFMf3IE1getxopPDryGfjFHBUVW+wGWS0w5214PAAPWRbYzzH7fiXkPnk1QTYYs
+EShHhV5+lTVETJb4PSqyc83jwDJYfiSmXPVSN7NH/gLs6CPqhjPwO520BXVyzF3D
+lRDkhYjzk8NMecdW3/Ztw9FJxS1EITyiAwcJcoCu+u0tX9XWQeUw9MAm2wPPp/iU
+BUCGQ86LY3szTmEcEG3EOFj0j+UrNJs01vGQf21o7JO+lDWhbuxncv7+zIxQqRA4
+1yaa7HxDNrSl/WmJvhAKKkuRtwuZhWb+IP0LEjsAVB+X5dGraVeYbpLGZJec2Xis
+X7v6UGua/L3LswjsovQIk8ou2j0LExEn+jOk+O8JLHzxrjBafSLS9D9g5akC4qor
+LopCMEfVEAcvscw7sadrUeyxVAVZE69Mp6Qn0PoV6IVMr36uzZhTMK123qS8ynho
+FzgA6IlrmM71UvlVPx+rpApKypMoPYVsU8nPMqVqwxQC1hR/TrWosL7nG/fUSrxe
+QCK9dLJhJ0v8CP2PwXGyDctdQj2BWxaTKmuvXFv1MjbxBNOZPcW4STuH8/RA3tmA
+tNfenfbuiDyC0lcvAgMBAAGjRjBEMEIGA1UdEQQ7MDmGN3NwaWZmZTovL3Rlc3Rk
+b21haW4uY29tL3Rlc3RJZGVudGlmaWVyL3Rlc3RVbmF1dGhvcml6ZWQwDQYJKoZI
+hvcNAQELBQADggIBAELimG+Aganv39Q2Rvc2rJKxhZQQak+G4qf1Ql7tzWdqIiBs
+4AUHI1Kc75SoIWKXtkDBdL28OdMqyEWoTJ4huJ+jJiBMYphaguQdXx0ZY/kmdiJb
+2zl6WFQMsZdfDRfMYypGNs/ieN65u4cnQKJxwME0OKdhflNuuydXxAAgUrygXSGN
+7J3GOhcPPAMiaxj+/aOeL/+HPt0tQKGrGC1W6zjrJznv1hD4pHo6Ch9GuTh1cWSc
+b6P4rSDb+NIshq6R+pB3FjcqT/lkewVAYlSTHI+yXt8sfLFqDJKUe0BlgNA0zTSA
+LuJNHzxtruIpMiRNy+SSxWoNc/R86eMnOxMBIp61wtonzlIhyb9VHy9kvpO9v0ru
+ijt2skEIAcNznxwX5nxNvG+byVytdNXpVZ1qSsC3QlS4cYhzMhpPG4X7S/O9cf03
+Sst3WKF7ZOKpmef5IxetlU+vbwfq86CzTzXP5wmXCK1y7Pys+jxlU6bzYkU1sKb2
+GPnkTU2gNncRmkatwtk+BhgVp6glDcjdN6sUY/SSAlLc5XiFz9rhvZa4+P8DjjoJ
+wJpzZWAqDShs0sJ6VvUFngw0sw/U2OmJcLHpoTTH/Iw8Qa+9p7rlRxvud8RIk7Mm
+0YXU84vP/QmCKhHfkbhoDRUTwCxRvP7PUj10Jtrrjj+X5p+QFyrVuBCojc8k
+-----END CERTIFICATE-----
diff --git a/test/unit/org/apache/cassandra/audit/AuditLoggerAuthTest.java 
b/test/unit/org/apache/cassandra/audit/AuditLoggerAuthTest.java
index 0ced5a10e7..50d20ea883 100644
--- a/test/unit/org/apache/cassandra/audit/AuditLoggerAuthTest.java
+++ b/test/unit/org/apache/cassandra/audit/AuditLoggerAuthTest.java
@@ -69,7 +69,7 @@ public class AuditLoggerAuthTest
     public static void setup() throws Exception
     {
         OverrideConfigurationLoader.override((config) -> {
-            config.authenticator = "PasswordAuthenticator";
+            config.authenticator = new 
ParameterizedClass("PasswordAuthenticator");
             config.role_manager = "CassandraRoleManager";
             config.authorizer = "CassandraAuthorizer";
             config.audit_logging_options.enabled = true;
diff --git a/test/unit/org/apache/cassandra/auth/AuthConfigTest.java 
b/test/unit/org/apache/cassandra/auth/AuthConfigTest.java
new file mode 100644
index 0000000000..b9bde913be
--- /dev/null
+++ b/test/unit/org/apache/cassandra/auth/AuthConfigTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.io.IOException;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.MBeanWrapper;
+
+import static org.apache.cassandra.auth.AuthTestUtils.loadCertificateChain;
+import static 
org.apache.cassandra.auth.IInternodeAuthenticator.InternodeConnectionDirection.INBOUND;
+import static org.apache.cassandra.config.YamlConfigurationLoaderTest.load;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class AuthConfigTest
+{
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void testNewInstanceForMutualTlsInternodeAuthenticator() throws 
IOException, CertificateException
+    {
+        Config config = load("cassandra-mtls.yaml");
+        config.internode_authenticator.class_name = 
"org.apache.cassandra.auth.MutualTlsInternodeAuthenticator";
+        config.internode_authenticator.parameters = 
Collections.singletonMap("validator_class_name", 
"org.apache.cassandra.auth.SpiffeCertificateValidator");
+        config.server_encryption_options = 
config.server_encryption_options.withOutboundKeystore("test/conf/cassandra_ssl_test_outbound.keystore")
+                                                                           
.withOutboundKeystorePassword("cassandra");
+        DatabaseDescriptor.setConfig(config);
+        MutualTlsInternodeAuthenticator authenticator = 
ParameterizedClass.newInstance(config.internode_authenticator,
+                                                                               
        Arrays.asList("", "org.apache.cassandra.auth."));
+
+        InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.1");
+
+        Certificate[] authorizedCertificates = 
loadCertificateChain("auth/SampleMtlsClientCertificate.pem");
+        assertTrue(authenticator.authenticate(address.getAddress(), 
address.getPort(), authorizedCertificates, INBOUND));
+
+        Certificate[] unauthorizedCertificates = 
loadCertificateChain("auth/SampleUnauthorizedMtlsClientCertificate.pem");
+        assertFalse(authenticator.authenticate(address.getAddress(), 
address.getPort(), unauthorizedCertificates, INBOUND));
+    }
+
+    @Test
+    public void testNewInstanceForMutualTlsWithPasswordFallbackAuthenticator()
+    {
+        Config config = load("cassandra-mtls.yaml");
+        config.client_encryption_options.applyConfig();
+        config.authenticator.class_name = 
"org.apache.cassandra.auth.MutualTlsWithPasswordFallbackAuthenticator";
+        config.authenticator.parameters = 
Collections.singletonMap("validator_class_name", 
"org.apache.cassandra.auth.SpiffeCertificateValidator");
+        DatabaseDescriptor.setConfig(config);
+        MutualTlsWithPasswordFallbackAuthenticator authenticator = 
ParameterizedClass.newInstance(config.authenticator,
+                                                                               
                   Arrays.asList("", "org.apache.cassandra.auth."));
+        assertNotNull(authenticator);
+        unregisterIdentitesCache();
+    }
+
+    @Test
+    public void testNewInstanceForMutualTlsAuthenticator() throws IOException, 
CertificateException
+    {
+        Config config = load("cassandra-mtls.yaml");
+        config.client_encryption_options.applyConfig();
+        DatabaseDescriptor.setConfig(config);
+        MutualTlsAuthenticator authenticator = 
ParameterizedClass.newInstance(config.authenticator,
+                                                                              
Arrays.asList("", "org.apache.cassandra.auth."));
+        assertNotNull(authenticator);
+        unregisterIdentitesCache();
+    }
+
+    private void unregisterIdentitesCache()
+    {
+        
MBeanWrapper.instance.unregisterMBean("org.apache.cassandra.auth:type=IdentitiesCache");
+    }
+}
diff --git a/test/unit/org/apache/cassandra/auth/AuthTestUtils.java 
b/test/unit/org/apache/cassandra/auth/AuthTestUtils.java
index c78520e06f..e3ef505be9 100644
--- a/test/unit/org/apache/cassandra/auth/AuthTestUtils.java
+++ b/test/unit/org/apache/cassandra/auth/AuthTestUtils.java
@@ -18,7 +18,18 @@
 
 package org.apache.cassandra.auth;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.util.Collection;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.auth.jmx.AuthorizationProxy;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -32,8 +43,11 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.transport.messages.ResultMessage;
 
+import static org.junit.Assert.assertNotNull;
+
 
 public class AuthTestUtils
 {
@@ -70,6 +84,12 @@ public class AuthTestUtils
             return QueryProcessor.executeInternal(query);
         }
 
+        @Override
+        UntypedResultSet process(String query, ConsistencyLevel 
consistencyLevel, ByteBuffer... values)
+        {
+            return QueryProcessor.executeInternal(query, (Object[]) values);
+        }
+
         @Override
         protected void scheduleSetupTask(final Callable<Void> setupTask)
         {
@@ -171,4 +191,30 @@ public class AuthTestUtils
         roleOptions.setOption(IRoleManager.Option.PASSWORD, "ignored");
         return roleOptions;
     }
+
+    // mTLS authenticators related utility methods
+    public static InetAddress getMockInetAddress() throws UnknownHostException
+    {
+        return InetAddress.getByName("127.0.0.1");
+    }
+
+    public static Certificate[] loadCertificateChain(final String path) throws 
CertificateException
+    {
+        InputStream inputStream = 
MutualTlsAuthenticator.class.getClassLoader().getResourceAsStream(path);
+        assertNotNull(inputStream);
+        Collection<? extends Certificate> c = 
CertificateFactory.getInstance("X.509").generateCertificates(inputStream);
+        X509Certificate[] certs = new X509Certificate[c.size()];
+        for (int i = 0; i < certs.length; i++)
+        {
+            certs[i] = (X509Certificate) c.toArray()[i];
+        }
+        return certs;
+    }
+
+    public static void initializeIdentityRolesTable(final String identity) 
throws IOException, TimeoutException
+    {
+        StorageService.instance.truncate(SchemaConstants.AUTH_KEYSPACE_NAME, 
AuthKeyspace.IDENTITY_TO_ROLES);
+        String insertQuery = "Insert into %s.%s (identity, role) values ('%s', 
'readonly_user');";
+        QueryProcessor.process(String.format(insertQuery, 
SchemaConstants.AUTH_KEYSPACE_NAME, AuthKeyspace.IDENTITY_TO_ROLES, identity), 
ConsistencyLevel.ONE);
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/auth/MutualTlsAuthenticatorTest.java 
b/test/unit/org/apache/cassandra/auth/MutualTlsAuthenticatorTest.java
new file mode 100644
index 0000000000..0dc7984f26
--- /dev/null
+++ b/test/unit/org/apache/cassandra/auth/MutualTlsAuthenticatorTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.io.IOException;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.AuthenticationException;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.MBeanWrapper;
+
+import static org.apache.cassandra.auth.AuthTestUtils.getMockInetAddress;
+import static 
org.apache.cassandra.auth.AuthTestUtils.initializeIdentityRolesTable;
+import static org.apache.cassandra.auth.AuthTestUtils.loadCertificateChain;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+
+@RunWith(Parameterized.class)
+public class MutualTlsAuthenticatorTest
+{
+    @Parameterized.Parameter(0)
+    public String certificatePath;
+    @Parameterized.Parameter(1)
+    public String identity;
+
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Parameterized.Parameters()
+    public static Collection<Object[]> versions()
+    {
+        return Collections.singletonList(new Object[]{ 
"auth/SampleMtlsClientCertificate.pem", 
"spiffe://testdomain.com/testIdentifier/testValue" });
+    }
+
+    @BeforeClass
+    public static void setup()
+    {
+        SchemaLoader.loadSchema();
+        DatabaseDescriptor.daemonInitialization();
+        StorageService.instance.initServer(0);
+        
((CassandraRoleManager)DatabaseDescriptor.getRoleManager()).loadIdentityStatement();
+        final Config config = DatabaseDescriptor.getRawConfig();
+        config.client_encryption_options = 
config.client_encryption_options.withEnabled(true)
+                                                                           
.withRequireClientAuth(true);
+    }
+
+    @After
+    public void after() throws IOException, TimeoutException
+    {
+        
MBeanWrapper.instance.unregisterMBean("org.apache.cassandra.auth:type=IdentitiesCache");
+        StorageService.instance.truncate(SchemaConstants.AUTH_KEYSPACE_NAME, 
AuthKeyspace.IDENTITY_TO_ROLES);
+    }
+
+    String getValidatorClass()
+    {
+        return "org.apache.cassandra.auth.SpiffeCertificateValidator";
+    }
+
+    @Test
+    public void testAuthorizedUsers() throws CertificateException, 
IOException, TimeoutException
+    {
+        initializeIdentityRolesTable(identity);
+        Certificate[] chain = loadCertificateChain(certificatePath);
+
+        // Verify authenticated user is as expected
+        IAuthenticator mutualTlsAuthenticator = 
createAndInitializeMtlsAuthenticator();
+        IAuthenticator.SaslNegotiator saslNegotiator = 
mutualTlsAuthenticator.newSaslNegotiator(getMockInetAddress(), chain);
+        AuthenticatedUser authenticatedUser = 
saslNegotiator.getAuthenticatedUser();
+        assertNotNull(authenticatedUser);
+        assertEquals("readonly_user", authenticatedUser.getName());
+    }
+
+    @Test
+    public void testUnauthorizedUsers() throws CertificateException, 
IOException, TimeoutException
+    {
+        // As identity of certificate is not added to identity_role_table, 
connection should fail
+        Certificate[] chain = loadCertificateChain(certificatePath);
+        IAuthenticator mutualTlsAuthenticator = 
createAndInitializeMtlsAuthenticator();
+        IAuthenticator.SaslNegotiator saslNegotiator = 
mutualTlsAuthenticator.newSaslNegotiator(getMockInetAddress(), chain);
+        expectedException.expect(AuthenticationException.class);
+        expectedException.expectMessage(String.format("Certificate identity 
'%s' not authorized", identity));
+        saslNegotiator.getAuthenticatedUser();
+    }
+
+    @Test
+    public void testInvalidUsers() throws CertificateException, IOException, 
TimeoutException
+    {
+        initializeIdentityRolesTable(identity);
+        Certificate[] clientCertificates = 
loadCertificateChain("auth/SampleInvalidCertificate.pem");
+        IAuthenticator mutualTlsAuthenticator = 
createAndInitializeMtlsAuthenticator();
+        IAuthenticator.SaslNegotiator saslNegotiator = 
mutualTlsAuthenticator.newSaslNegotiator(getMockInetAddress(), 
clientCertificates);
+        expectedException.expect(AuthenticationException.class);
+        expectedException.expectMessage("Unable to extract Spiffe from the 
certificate");
+        saslNegotiator.getAuthenticatedUser();
+    }
+
+    @Test
+    public void testChangeInValidUrns() throws CertificateException, 
IOException, TimeoutException
+    {
+        DatabaseDescriptor.setCredentialsValidity(10);
+        initializeIdentityRolesTable(identity);
+        Certificate[] chain = loadCertificateChain(certificatePath);
+        IAuthenticator mutualTlsAuthenticator = 
createAndInitializeMtlsAuthenticator();
+        IAuthenticator.SaslNegotiator saslNegotiator = 
mutualTlsAuthenticator.newSaslNegotiator(getMockInetAddress(), chain);
+        assertEquals("readonly_user", 
saslNegotiator.getAuthenticatedUser().getName());
+        // following call truncates identity table. After removing the 
identity of certificate, we should get
+        // authentication exception
+        initializeIdentityRolesTable("another_id");
+        expectedException.expect(AuthenticationException.class);
+        expectedException.expectMessage(String.format("Certificate identity 
'%s' not authorized", identity));
+        saslNegotiator.getAuthenticatedUser();
+    }
+
+    @Test
+    public void testValidatorClassNameIsNotSet()
+    {
+        expectedException.expect(ConfigurationException.class);
+        
expectedException.expectMessage("authenticator.parameters.validator_class_name 
is not set");
+        new MutualTlsAuthenticator(Collections.emptyMap());
+    }
+
+    @Test
+    public void testAddingAndRemovingIdentitiesToTableReflectsInCache() throws 
IOException, TimeoutException
+    {
+        DatabaseDescriptor.setCredentialsValidity(10);
+        String identity1 = "id1";
+        String identity2 = "id2";
+
+        initializeIdentityRolesTable(identity1);
+        MutualTlsAuthenticator.IdentityCache urnCache = new 
MutualTlsAuthenticator.IdentityCache();
+        assertEquals("readonly_user", urnCache.get(identity1));
+
+        initializeIdentityRolesTable(identity2);
+        assertNull(urnCache.get(identity1));
+        assertEquals("readonly_user", urnCache.get(identity2));
+    }
+
+    MutualTlsAuthenticator createAndInitializeMtlsAuthenticator()
+    {
+        Map<String, String> parameters = 
Collections.singletonMap("validator_class_name", getValidatorClass());
+        MutualTlsAuthenticator mutualTlsAuthenticator = new 
MutualTlsAuthenticator(parameters);
+        mutualTlsAuthenticator.setup();
+        return mutualTlsAuthenticator;
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/auth/MutualTlsInternodeAuthenticatorTest.java 
b/test/unit/org/apache/cassandra/auth/MutualTlsInternodeAuthenticatorTest.java
new file mode 100644
index 0000000000..37f8d062f9
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/auth/MutualTlsInternodeAuthenticatorTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.auth;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.auth.AuthTestUtils.loadCertificateChain;
+import static 
org.apache.cassandra.auth.IInternodeAuthenticator.InternodeConnectionDirection.INBOUND;
+import static 
org.apache.cassandra.auth.IInternodeAuthenticator.InternodeConnectionDirection.OUTBOUND;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_CONFIG;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class MutualTlsInternodeAuthenticatorTest
+{
+    private static final String VALIDATOR_CLASS_NAME = "validator_class_name";
+    private static final String TRUSTED_PEER_IDENTITIES = 
"trusted_peer_identities";
+    private static final String NODE_IDENTITY = "node_identity";
+
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    @Parameterized.Parameter(0)
+    public String certificatePath;
+    @Parameterized.Parameter(1)
+    public String identity;
+
+    @Parameterized.Parameters()
+    public static Collection<Object[]> versions()
+    {
+        return Collections.singletonList(new Object[]{ 
"auth/SampleMtlsClientCertificate.pem", 
"spiffe://testdomain.com/testIdentifier/testValue" });
+    }
+
+    @BeforeClass
+    public static void initialize()
+    {
+        CASSANDRA_CONFIG.setString("cassandra-mtls.yaml");
+        SchemaLoader.loadSchema();
+        DatabaseDescriptor.daemonInitialization();
+        StorageService.instance.initServer(0);
+    }
+
+    @Before
+    public void before()
+    {
+        Config config = DatabaseDescriptor.getRawConfig();
+        config.server_encryption_options = 
config.server_encryption_options.withOutboundKeystore("test/conf/cassandra_ssl_test_outbound.keystore")
+                                                                           
.withOutboundKeystorePassword("cassandra");
+    }
+
+    String getValidatorClass()
+    {
+        return "org.apache.cassandra.auth.SpiffeCertificateValidator";
+    }
+
+    @Test
+    public void 
testAuthenticateWithoutCertificatesShouldThrowUnsupportedOperation() throws 
UnknownHostException
+    {
+        InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.1");
+        IInternodeAuthenticator authenticator = new 
MutualTlsInternodeAuthenticator(getParams());
+        expectedException.expect(UnsupportedOperationException.class);
+        expectedException.expectMessage("mTLS Authenticator only supports 
certificate based authenticate method");
+        authenticator.authenticate(address.getAddress(), address.getPort());
+    }
+
+    @Test
+    public void testAuthenticationOfOutboundConnectionsShouldBeSuccess() 
throws UnknownHostException
+    {
+        InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.1");
+        IInternodeAuthenticator authenticator = new 
MutualTlsInternodeAuthenticator(getParams());
+        assertTrue(authenticator.authenticate(address.getAddress(), 
address.getPort(), new Certificate[0], OUTBOUND));
+    }
+
+    @Test
+    public void testAuthorizedUsersTrustedPeersConfigured() throws 
UnknownHostException, CertificateException
+    {
+        Map<String, String> params = new HashMap<>(getParams());
+        params.put(TRUSTED_PEER_IDENTITIES, 
"spiffe://testdomain.com/testIdentifier/testValue");
+        InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.1");
+        IInternodeAuthenticator authenticator = new 
MutualTlsInternodeAuthenticator(params);
+        Certificate[] clientCertificates = 
loadCertificateChain(certificatePath);
+        assertTrue(authenticator.authenticate(address.getAddress(), 
address.getPort(), clientCertificates, INBOUND));
+    }
+
+    @Test
+    public void testAuthorizedUsersTrustedPeersNotConfigured() throws 
IOException, CertificateException
+    {
+        InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.1");
+        IInternodeAuthenticator authenticator = new 
MutualTlsInternodeAuthenticator(getParams());
+        Certificate[] clientCertificates = 
loadCertificateChain(certificatePath);
+        assertTrue(authenticator.authenticate(address.getAddress(), 
address.getPort(), clientCertificates, INBOUND));
+    }
+
+    @Test
+    public void testNodeIdentityMismatch()
+    {
+        Map<String, String> params = new HashMap<>(getParams());
+        params.put(NODE_IDENTITY, "id1");
+        expectedException.expect(ConfigurationException.class);
+        expectedException.expectMessage("Configured node identity is not 
matching identity extracted" +
+                                        "from the keystore");
+        new MutualTlsInternodeAuthenticator(params);
+    }
+
+    @Test
+    public void testUnauthorizedUser() throws IOException, 
CertificateException, TimeoutException
+    {
+        InetAddressAndPort address = InetAddressAndPort.getByName("127.0.0.1");
+        Map<String, String> parameters = getParams();
+        IInternodeAuthenticator authenticator = new 
MutualTlsInternodeAuthenticator(parameters);
+        Certificate[] clientCertificates = 
loadCertificateChain("auth/SampleUnauthorizedMtlsClientCertificate.pem");
+        assertFalse(authenticator.authenticate(address.getAddress(), 
address.getPort(), clientCertificates, INBOUND));
+    }
+
+    @Test
+    public void testNoValidatorClassNameInConfig()
+    {
+        Map<String, String> parameters = new HashMap<>(getParams());
+        parameters.put(VALIDATOR_CLASS_NAME, null);
+        expectedException.expect(ConfigurationException.class);
+        
expectedException.expectMessage("internode_authenticator.parameters.validator_class_name
 is not set");
+        new MutualTlsInternodeAuthenticator(parameters);
+    }
+
+
+    @Test
+    public void testNoIdentitiesInKeystore()
+    {
+        Config config = DatabaseDescriptor.getRawConfig();
+        config.server_encryption_options = 
config.server_encryption_options.withOutboundKeystore("test/conf/cassandra_ssl_test.keystore")
+                                                                           
.withOutboundKeystorePassword("cassandra");
+        expectedException.expect(ConfigurationException.class);
+        expectedException.expectMessage("No identity was extracted from the 
outbound keystore 'test/conf/cassandra_ssl_test.keystore'");
+        new MutualTlsInternodeAuthenticator(getParams());
+    }
+
+    @Test
+    public void testGetIdentitiesFromKeystore()
+    {
+        List<String> identities = new 
MutualTlsInternodeAuthenticator(getParams()).getIdentitiesFromKeyStore("test/conf/cassandra_ssl_test_outbound.keystore",
 "cassandra", "JKS");
+        assertFalse(identities.isEmpty());
+        assertTrue(identities.contains(identity));
+    }
+
+    Map<String, String> getParams()
+    {
+        return Collections.singletonMap(VALIDATOR_CLASS_NAME, 
getValidatorClass());
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/auth/MutualTlsWithPasswordFallbackAuthenticatorTest.java
 
b/test/unit/org/apache/cassandra/auth/MutualTlsWithPasswordFallbackAuthenticatorTest.java
new file mode 100644
index 0000000000..f4269a72fa
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/auth/MutualTlsWithPasswordFallbackAuthenticatorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.io.InputStream;
+import java.net.UnknownHostException;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.util.Collections;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.auth.AuthTestUtils.getMockInetAddress;
+import static org.junit.Assert.assertNotNull;
+import static org.psjava.util.AssertStatus.assertTrue;
+
+public class MutualTlsWithPasswordFallbackAuthenticatorTest
+{
+    private static MutualTlsWithPasswordFallbackAuthenticator 
fallbackAuthenticator;
+    private Certificate[] clientCertificatesCorp;
+
+    @BeforeClass
+    public static void initialize()
+    {
+        SchemaLoader.loadSchema();
+        DatabaseDescriptor.daemonInitialization();
+        StorageService.instance.initServer(0);
+        Config config = DatabaseDescriptor.getRawConfig();
+        config.client_encryption_options = 
config.client_encryption_options.withEnabled(true)
+                                                                           
.withRequireClientAuth(true);
+        Map<String, String> parameters = 
Collections.singletonMap("validator_class_name", 
"org.apache.cassandra.auth.SpiffeCertificateValidator");
+        fallbackAuthenticator = new 
MutualTlsWithPasswordFallbackAuthenticator(parameters);
+        fallbackAuthenticator.setup();
+    }
+
+    @Before
+    public void setup() throws CertificateException
+    {
+        InputStream inputStreamCorp = 
getClass().getClassLoader().getResourceAsStream("auth/SampleMtlsClientCertificate.pem");
+        assertNotNull(inputStreamCorp);
+        Certificate corpCertificate = 
CertificateFactory.getInstance("X.509").generateCertificate(inputStreamCorp);
+        clientCertificatesCorp = new Certificate[]{ corpCertificate };
+    }
+
+    @Test
+    public void testFallbackToPasswordAuthentication() throws 
UnknownHostException
+    {
+        // If client certificate chain is not present fallback to password 
authentication
+        IAuthenticator.SaslNegotiator passwordNegotiator = 
fallbackAuthenticator.newSaslNegotiator(getMockInetAddress());
+        assertTrue(passwordNegotiator instanceof 
PasswordAuthenticator.PlainTextSaslAuthenticator);
+
+        // If client certificate chain is null fallback to password 
authentication
+        IAuthenticator.SaslNegotiator passwordNegotiator1 = 
fallbackAuthenticator.newSaslNegotiator(getMockInetAddress(), null);
+        assertTrue(passwordNegotiator1 instanceof 
PasswordAuthenticator.PlainTextSaslAuthenticator);
+
+        // If client certificate chain length is zero fallback to password 
authentication
+        IAuthenticator.SaslNegotiator passwordNegotiator2 = 
fallbackAuthenticator.newSaslNegotiator(getMockInetAddress(), new 
Certificate[0]);
+        assertTrue(passwordNegotiator2 instanceof 
PasswordAuthenticator.PlainTextSaslAuthenticator);
+    }
+
+    @Test
+    public void testUsesMtlsAuthenticationWhenCertificatesPresent() throws 
UnknownHostException
+    {
+        // If client certificate chain present and valid use mTLS 
authentication
+        IAuthenticator.SaslNegotiator mutualtlsAuthenticator = 
fallbackAuthenticator.newSaslNegotiator(getMockInetAddress(), 
clientCertificatesCorp);
+        assertTrue(mutualtlsAuthenticator instanceof 
MutualTlsAuthenticator.CertificateNegotiator);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/auth/SpiffeCertificateValidatorTest.java 
b/test/unit/org/apache/cassandra/auth/SpiffeCertificateValidatorTest.java
new file mode 100644
index 0000000000..fb1083c3f0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/auth/SpiffeCertificateValidatorTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.auth;
+
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.cassandra.exceptions.AuthenticationException;
+
+import static org.apache.cassandra.auth.AuthTestUtils.loadCertificateChain;
+import static org.junit.Assert.assertEquals;
+
+public class SpiffeCertificateValidatorTest
+{
+    private static final String CERTIFICATE_PATH = 
"auth/SampleMtlsClientCertificate.pem";
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void getIdentityShouldReturnSpiffeTest() throws CertificateException
+    {
+        SpiffeCertificateValidator validator = new 
SpiffeCertificateValidator();
+        Certificate[] chain = loadCertificateChain(CERTIFICATE_PATH);
+        String spiffe = validator.identity(chain);
+        assertEquals("spiffe://testdomain.com/testIdentifier/testValue", 
spiffe);
+    }
+
+    @Test
+    public void getIdentityShouldThrowExceptionOnNoSpiffeInSAN() throws 
CertificateException
+    {
+        SpiffeCertificateValidator validator = new 
SpiffeCertificateValidator();
+        String invalidCertificate = "auth/SampleInvalidCertificate.pem";
+        Certificate[] chain = loadCertificateChain(invalidCertificate);
+        expectedException.expectMessage("Unable to extract Spiffe from the 
certificate");
+        expectedException.expect(AuthenticationException.class);
+        validator.identity(chain);
+    }
+}
diff --git a/test/unit/org/apache/cassandra/config/ConfigCompatabilityTest.java 
b/test/unit/org/apache/cassandra/config/ConfigCompatabilityTest.java
index 4a620334c3..6db68ec6b7 100644
--- a/test/unit/org/apache/cassandra/config/ConfigCompatabilityTest.java
+++ b/test/unit/org/apache/cassandra/config/ConfigCompatabilityTest.java
@@ -88,6 +88,14 @@ public class ConfigCompatabilityTest
                                                      
.add("streaming_socket_timeout_in_ms") // CASSANDRA-12229
                                                      .build();
 
+    private static final Set<String> EXPECTED_FOR_50 = 
ImmutableSet.<String>builder()
+                                                                   // Switched 
to a parameterized class that can construct from a bare string
+                                                                   
.add("internode_authenticator types do not match; 
org.apache.cassandra.config.ParameterizedClass != java.lang.String")
+                                                                   
.add("authenticator types do not match; 
org.apache.cassandra.config.ParameterizedClass != java.lang.String")
+                                                                   
.add("Property internode_authenticator used to be a value-type, but now is 
nested type class org.apache.cassandra.config.ParameterizedClass")
+                                                                   
.add("Property authenticator used to be a value-type, but now is nested type 
class org.apache.cassandra.config.ParameterizedClass")
+                                                                   .build();
+
     /**
      * Not all converts make sense as backwards compatible as they use things 
like String to handle the conversion more
      * generically.
@@ -97,13 +105,13 @@ public class ConfigCompatabilityTest
     @Test
     public void diff_3_0() throws IOException
     {
-        diff(TEST_DIR + "/version=3.0.0-alpha1.yml", REMOVED_IN_40);
+        diff(TEST_DIR + "/version=3.0.0-alpha1.yml", REMOVED_IN_40, 
EXPECTED_FOR_50);
     }
 
     @Test
     public void diff_3_11() throws IOException
     {
-        diff(TEST_DIR + "/version=3.11.0.yml", REMOVED_IN_40);
+        diff(TEST_DIR + "/version=3.11.0.yml", REMOVED_IN_40, EXPECTED_FOR_50);
     }
 
     @Test
@@ -111,10 +119,10 @@ public class ConfigCompatabilityTest
     {
         diff(TEST_DIR + "/version=4.0-alpha1.yml", 
ImmutableSet.<String>builder()
                                                         .addAll(WINDOWS)
-                                                        .build());
+                                                        .build(), 
EXPECTED_FOR_50);
     }
 
-    private void diff(String original, Set<String> ignore) throws IOException
+    private void diff(String original, Set<String> ignore, Set<String> 
expectedErrors) throws IOException
     {
         Class<Config> type = Config.class;
         ClassTree previous = load(original);
@@ -124,6 +132,7 @@ public class ConfigCompatabilityTest
         Set<String> errors = new HashSet<>();
         diff(loader, replacements, previous, type, "", missing, errors);
         missing = Sets.difference(missing, ignore);
+        errors = Sets.difference(errors, expectedErrors);
         StringBuilder msg = new StringBuilder();
         if (!missing.isEmpty())
             msg.append(String.format("Unable to find the following 
properties:\n%s", String.join("\n", new TreeSet<>(missing))));
diff --git 
a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java 
b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
index 1f7819820c..efa47b05d7 100644
--- a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
+++ b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.config;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.net.MalformedURLException;
@@ -26,6 +27,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
 
 import com.google.common.collect.ImmutableMap;
@@ -41,6 +43,7 @@ import static 
org.apache.cassandra.config.YamlConfigurationLoader.SYSTEM_PROPERT
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 
@@ -387,7 +390,41 @@ public class YamlConfigurationLoaderTest
         return YamlConfigurationLoader.fromMap(builder.build(), Config.class);
     }
 
-    private static Config load(String path)
+    @Test
+    public void 
testBackwardCompatibilityOfInternodeAuthenticatorPropertyAsMap()
+    {
+        Config config = load("cassandra-mtls.yaml");
+        assertEquals(config.internode_authenticator.class_name, 
"org.apache.cassandra.auth.MutualTlsInternodeAuthenticator");
+        assertFalse(config.internode_authenticator.parameters.isEmpty());
+        
assertEquals(config.internode_authenticator.parameters.get("validator_class_name"),
 "org.apache.cassandra.auth.SpiffeCertificateValidator");
+    }
+
+    @Test
+    public void 
testBackwardCompatibilityOfInternodeAuthenticatorPropertyAsString()
+    {
+        Config config = load("cassandra-mtls-backward-compatibility.yaml");
+        assertEquals(config.internode_authenticator.class_name, 
"org.apache.cassandra.auth.AllowAllInternodeAuthenticator");
+        assertTrue(config.internode_authenticator.parameters.isEmpty());
+    }
+
+    @Test
+    public void testBackwardCompatibilityOfAuthenticatorPropertyAsMap()
+    {
+        Config config = load("cassandra-mtls.yaml");
+        assertEquals(config.authenticator.class_name, 
"org.apache.cassandra.auth.MutualTlsAuthenticator");
+        assertFalse(config.authenticator.parameters.isEmpty());
+        
assertEquals(config.authenticator.parameters.get("validator_class_name"), 
"org.apache.cassandra.auth.SpiffeCertificateValidator");
+    }
+
+    @Test
+    public void testBackwardCompatibilityOfAuthenticatorPropertyAsString() 
throws IOException, TimeoutException
+    {
+        Config config = load("cassandra-mtls-backward-compatibility.yaml");
+        assertEquals(config.authenticator.class_name, 
"org.apache.cassandra.auth.AllowAllAuthenticator");
+        assertTrue(config.authenticator.parameters.isEmpty());
+    }
+
+    public static Config load(String path)
     {
         URL url = 
YamlConfigurationLoaderTest.class.getClassLoader().getResource(path);
         if (url == null)
diff --git 
a/test/unit/org/apache/cassandra/cql3/statements/AddIdentityStatementTest.java 
b/test/unit/org/apache/cassandra/cql3/statements/AddIdentityStatementTest.java
new file mode 100644
index 0000000000..e77ae35413
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/cql3/statements/AddIdentityStatementTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.auth.AuthCacheService;
+import org.apache.cassandra.auth.AuthKeyspace;
+import org.apache.cassandra.auth.AuthTestUtils;
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.CassandraRoleManager;
+import org.apache.cassandra.auth.RoleResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+
+import static org.apache.cassandra.auth.AuthKeyspace.IDENTITY_TO_ROLES;
+import static org.apache.cassandra.schema.SchemaConstants.AUTH_KEYSPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class AddIdentityStatementTest
+{
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    private static final String USER_ROLE = "cassandra";
+    private static final String IDENTITY = 
"spiffe://testdomain.com/testIdentifier/testValue";
+    private static final String ADD_QUERY = String.format("ADD IDENTITY '%s' 
TO ROLE '%s';", IDENTITY, USER_ROLE);
+    private static void setupPrivilegedUser()
+    {
+        QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (role, 
is_superuser, can_login, salted_hash) "
+                                                     + "VALUES ('%s', true, 
true, '%s')",
+                                                     AUTH_KEYSPACE_NAME,
+                                                     AuthKeyspace.ROLES,
+                                                     
CassandraRoleManager.DEFAULT_SUPERUSER_NAME,
+                                                     "xxx"));
+    }
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.setupAuth(new AuthTestUtils.LocalCassandraRoleManager(),
+                               new AuthTestUtils.LocalPasswordAuthenticator(),
+                               new AuthTestUtils.LocalCassandraAuthorizer(),
+                               new 
AuthTestUtils.LocalCassandraNetworkAuthorizer());
+        AuthCacheService.initializeAndRegisterCaches();
+        setupPrivilegedUser();
+    }
+
+    @Before
+    public void clear()
+    {
+        
Keyspace.open(AUTH_KEYSPACE_NAME).getColumnFamilyStore(IDENTITY_TO_ROLES).truncateBlocking();
+    }
+
+    @Test
+    public void testAddIdentityStatementParsing()
+    {
+        CQLStatement.Raw statement = QueryProcessor.parseStatement(ADD_QUERY);
+        assertTrue(statement instanceof AddIdentityStatement);
+        AddIdentityStatement addIdentityStatement =  
(AddIdentityStatement)statement;
+        assertEquals(IDENTITY, addIdentityStatement.identity);
+        assertEquals(USER_ROLE, addIdentityStatement.role);
+    }
+
+    @Test
+    public void testAddingValidIdentity()
+    {
+        QueryProcessor.process(ADD_QUERY, ConsistencyLevel.QUORUM, 
getClientState(), 10L);
+        assertEquals(USER_ROLE, 
DatabaseDescriptor.getRoleManager().roleForIdentity(IDENTITY));
+    }
+
+    @Test
+    public void testAddingExistingIdentity()
+    {
+        QueryProcessor.process(ADD_QUERY, ConsistencyLevel.QUORUM, 
getClientState(), 10L);
+        expectedException.expect(InvalidRequestException.class);
+        expectedException.expectMessage(IDENTITY +" already exists");
+        QueryProcessor.process(ADD_QUERY, ConsistencyLevel.QUORUM, 
getClientState(), 10L);
+    }
+
+    @Test
+    public void testAddIdentityOnlyWhenNotPresent()
+    {
+        DatabaseDescriptor.getRoleManager().addIdentity(IDENTITY, USER_ROLE);
+        expectedException.expect(IllegalStateException.class);
+        expectedException.expectMessage("Identity is already associated with 
another role, cannot associate it with role read_write_user");
+        DatabaseDescriptor.getRoleManager().addIdentity(IDENTITY, 
"read_write_user");
+    }
+
+    @Test
+    public void testAnonymousUser()
+    {
+        expectedException.expect(UnauthorizedException.class);
+        expectedException.expectMessage("You have not logged in");
+        QueryProcessor.executeInternal(ADD_QUERY);
+    }
+
+    @Test
+    public void testAddingNonExistentRole()
+    {
+        String query = String.format("ADD IDENTITY '%s' TO ROLE 
'non-existing-role';", IDENTITY);
+        expectedException.expect(InvalidRequestException.class);
+        expectedException.expectMessage("Can not add identity for non-existent 
role 'non-existing-role'");
+        QueryProcessor.process(query, ConsistencyLevel.QUORUM, 
getClientState(), 10L);
+    }
+
+    @Test
+    public void testUsersWithNoPrevilegesCannotAddIdentitiess()
+    {
+        // Added user to roles table
+        AuthenticatedUser authenticatedUser = new 
AuthenticatedUser("readwrite_user");
+        DatabaseDescriptor.getRoleManager().createRole(authenticatedUser, 
RoleResource.role("readwrite_user"), AuthTestUtils.getLoginRoleOptions());
+        ClientState state = ClientState.forInternalCalls();
+        state.login(authenticatedUser);
+
+        String query = String.format("ADD IDENTITY '%s' TO ROLE 
'readwrite_user';", IDENTITY);
+        expectedException.expect(UnauthorizedException.class);
+        expectedException.expectMessage("User readwrite_user does not have 
sufficient privileges to perform the requested operation");
+        QueryProcessor.process(query, ConsistencyLevel.QUORUM, new 
QueryState(state), 10L);
+    }
+
+
+    @Test
+    public void creatingRoleWithIdentitiesAlreadyExisting()
+    {
+        DatabaseDescriptor.getRoleManager().addIdentity(IDENTITY, 
"readwrite_user");
+        AuthenticatedUser authenticatedUser = new 
AuthenticatedUser("cassandra");
+        expectedException.expect(IllegalStateException.class);
+        expectedException.expectMessage("Cannot create a role 'readwrite_user' 
when identities already exists for it");
+        DatabaseDescriptor.getRoleManager().createRole(authenticatedUser, 
RoleResource.role("readwrite_user"), AuthTestUtils.getLoginRoleOptions());
+    }
+
+    @Test
+    public void ifNotExistsTest()
+    {
+        // Assert that identity is not present in the table
+        
assertNull(DatabaseDescriptor.getRoleManager().roleForIdentity(IDENTITY));
+
+        String addQueryWithIfNotExists = String.format("ADD IDENTITY IF NOT 
EXISTS '%s' TO ROLE '%s';", IDENTITY, USER_ROLE);
+
+        // Identity not in the table & add identity query with IF NOT EXISTS 
should succeed
+        QueryProcessor.process(addQueryWithIfNotExists, 
ConsistencyLevel.QUORUM, getClientState(), 10L);
+        assertEquals(USER_ROLE, 
DatabaseDescriptor.getRoleManager().roleForIdentity(IDENTITY));
+
+        // Identity in the table & add identity query with IF NOT EXISTS 
should succeed
+        QueryProcessor.process(addQueryWithIfNotExists, 
ConsistencyLevel.QUORUM, getClientState(), 10L);
+        assertEquals(USER_ROLE, 
DatabaseDescriptor.getRoleManager().roleForIdentity(IDENTITY));
+
+        clear();
+        String addQueryWithOutIfNotExists = String.format("ADD IDENTITY '%s' 
TO ROLE '%s';", IDENTITY, USER_ROLE);
+        // Identity not in the table & add identity query without IF NOT 
EXISTS should succeed
+        QueryProcessor.process(addQueryWithOutIfNotExists, 
ConsistencyLevel.QUORUM, getClientState(), 10L);
+        assertEquals(USER_ROLE, 
DatabaseDescriptor.getRoleManager().roleForIdentity(IDENTITY));
+
+        // Identity in the table & add identity query without IF NOT EXISTS 
should fail
+        expectedException.expect(InvalidRequestException.class);
+        expectedException.expectMessage(IDENTITY + " already exists");
+        QueryProcessor.process(addQueryWithOutIfNotExists, 
ConsistencyLevel.QUORUM, getClientState(), 10L);
+    }
+
+    static QueryState getClientState()
+    {
+        ClientState state = ClientState.forInternalCalls();
+        state.login(new 
AuthenticatedUser(CassandraRoleManager.DEFAULT_SUPERUSER_NAME));
+        return new QueryState(state);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/cql3/statements/DropIdentityStatementTest.java 
b/test/unit/org/apache/cassandra/cql3/statements/DropIdentityStatementTest.java
new file mode 100644
index 0000000000..7521c95e85
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/cql3/statements/DropIdentityStatementTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.statements;
+
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.cassandra.auth.AuthTestUtils;
+import org.apache.cassandra.auth.AuthenticatedUser;
+import org.apache.cassandra.auth.RoleResource;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnauthorizedException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+
+import static org.apache.cassandra.auth.AuthKeyspace.IDENTITY_TO_ROLES;
+import static 
org.apache.cassandra.cql3.statements.AddIdentityStatementTest.defineSchema;
+import static 
org.apache.cassandra.cql3.statements.AddIdentityStatementTest.getClientState;
+import static org.apache.cassandra.schema.SchemaConstants.AUTH_KEYSPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class DropIdentityStatementTest
+{
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    private static final String  IDENTITY = 
"spiffe://testdomain.com/testIdentifier/testValue";
+    private static final String DROP_QUERY = String.format("DROP IDENTITY 
'%s'", IDENTITY);
+
+    @BeforeClass
+    public static void beforeClasss() throws ConfigurationException
+    {
+        defineSchema();
+    }
+
+    @Before
+    public void clear()
+    {
+        
Keyspace.open(AUTH_KEYSPACE_NAME).getColumnFamilyStore(IDENTITY_TO_ROLES).truncateBlocking();
+    }
+
+
+    @Test
+    public void testDropIdentityStatementParsing()
+    {
+        CQLStatement.Raw statement = QueryProcessor.parseStatement(DROP_QUERY);
+        assertTrue(statement instanceof DropIdentityStatement);
+        DropIdentityStatement dropIdentityStatement = (DropIdentityStatement) 
statement;
+        assertEquals(IDENTITY, dropIdentityStatement.identity);
+        assertFalse(dropIdentityStatement.ifExists);
+    }
+
+    @Test
+    public void testDroppingValidIdentity()
+    {
+        DatabaseDescriptor.getRoleManager().addIdentity(IDENTITY, "cassandra");
+        QueryProcessor.process(DROP_QUERY, ConsistencyLevel.QUORUM, 
getClientState(), 10L);
+        
assertFalse(DatabaseDescriptor.getRoleManager().isExistingIdentity(IDENTITY));
+    }
+
+    @Test
+    public void testAnonymousUser()
+    {
+        expectedException.expect(UnauthorizedException.class);
+        expectedException.expectMessage("You have not logged in");
+        QueryProcessor.executeInternal(DROP_QUERY);
+    }
+
+    @Test
+    public void testUsersWithoutPrevilegesCannotDropIdentities()
+    {
+        // Added user to roles table
+        AuthenticatedUser authenticatedUser = new 
AuthenticatedUser("readwrite_user");
+        DatabaseDescriptor.getRoleManager().createRole(authenticatedUser, 
RoleResource.role("readwrite_user"), AuthTestUtils.getLoginRoleOptions());
+        ClientState state = ClientState.forInternalCalls();
+        state.login(authenticatedUser);
+
+        String query = String.format("DROP IDENTITY '%s';", IDENTITY);
+        expectedException.expect(UnauthorizedException.class);
+        expectedException.expectMessage("User readwrite_user does not have 
sufficient privileges to perform the requested operation");
+        QueryProcessor.process(query, ConsistencyLevel.QUORUM, new 
QueryState(state), 10L);
+    }
+
+    @Test
+    public void dropRoleSholdRemoveAllAssociatedIdentities()
+    {
+        // Create a new user and associate identities to the user
+        AuthenticatedUser authenticatedUser = new 
AuthenticatedUser("readwrite_user");
+        DatabaseDescriptor.getRoleManager().createRole(authenticatedUser, 
RoleResource.role("readwrite_user"), AuthTestUtils.getLoginRoleOptions());
+        DatabaseDescriptor.getRoleManager().addIdentity(IDENTITY, 
"readwrite_user");
+        String identity1 = "spiffe://testdomain.com/testIdentifier/testValue1";
+        DatabaseDescriptor.getRoleManager().addIdentity(identity1, 
"readwrite_user");
+
+        // Dropping role should remove identities associated with it
+        String query = "DROP ROLE readwrite_user";
+        QueryProcessor.process(query,  ConsistencyLevel.QUORUM, 
getClientState(), 10L);
+
+        Map<String, String > m = 
DatabaseDescriptor.getRoleManager().authorizedIdentities();
+        assertFalse(m.containsKey(IDENTITY));
+        assertFalse(m.containsKey(identity1));
+    }
+
+    @Test
+    public void ifExistsTest()
+    {
+        // Assert that identity is not present in the table
+        
assertNull(DatabaseDescriptor.getRoleManager().roleForIdentity(IDENTITY));
+
+        String dropQueryWithIfExists = String.format("DROP IDENTITY IF EXISTS 
'%s'", IDENTITY);
+
+        // Identity in the table & IF EXISTS in query should succeed
+        DatabaseDescriptor.getRoleManager().addIdentity(IDENTITY, "cassandra");
+        QueryProcessor.process(dropQueryWithIfExists, ConsistencyLevel.QUORUM, 
getClientState(), 10L);
+
+        // Identity not in the table & IF EXISTS in query should succeed
+        QueryProcessor.process(dropQueryWithIfExists, ConsistencyLevel.QUORUM, 
getClientState(), 10L);
+
+        String dropQueryWithOutIfExists = String.format("DROP IDENTITY '%s'", 
IDENTITY);
+        // Identity in the table & no IF EXISTS in query should succeed
+        DatabaseDescriptor.getRoleManager().addIdentity(IDENTITY, "cassandra");
+        QueryProcessor.process(dropQueryWithOutIfExists, 
ConsistencyLevel.QUORUM, getClientState(), 10L);
+
+        // Identity not in the table & no IF EXISTS in query should fail
+        expectedException.expect(InvalidRequestException.class);
+        expectedException.expectMessage(String.format("identity '%s' doesn't 
exist", IDENTITY));
+        QueryProcessor.process(dropQueryWithOutIfExists, 
ConsistencyLevel.QUORUM, getClientState(), 10L);
+    }
+}
diff --git a/test/unit/org/apache/cassandra/transport/CQLUserAuditTest.java 
b/test/unit/org/apache/cassandra/transport/CQLUserAuditTest.java
index 8bc9f28dd2..6b7323c9ab 100644
--- a/test/unit/org/apache/cassandra/transport/CQLUserAuditTest.java
+++ b/test/unit/org/apache/cassandra/transport/CQLUserAuditTest.java
@@ -64,7 +64,7 @@ public class CQLUserAuditTest
     public static void setup() throws Exception
     {
         OverrideConfigurationLoader.override((config) -> {
-            config.authenticator = "PasswordAuthenticator";
+            config.authenticator = new 
ParameterizedClass("PasswordAuthenticator");
             config.role_manager = "CassandraRoleManager";
             config.diagnostic_events_enabled = true;
             config.audit_logging_options.enabled = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to