Add diagnostic events for user audit logging

patch by Stefan Podkowinski; reviewed by Mick Semb Wever for CASSANDRA-13668


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d8c45192
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d8c45192
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d8c45192

Branch: refs/heads/trunk
Commit: d8c451923185841ca28e8cb1177b71edafbfd988
Parents: a79e590
Author: Stefan Podkowinski <stefan.podkowin...@1und1.de>
Authored: Fri Apr 6 09:49:38 2018 +0200
Committer: Stefan Podkowinski <stefan.podkowin...@1und1.de>
Committed: Fri Aug 17 14:08:37 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/audit/AuditEvent.java  |  75 ++++++
 .../audit/DiagnosticEventAuditLogger.java       |  39 +++
 .../cassandra/transport/CQLUserAuditTest.java   | 253 +++++++++++++++++++
 4 files changed, 368 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d8c45192/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 097e7dd..d2970a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add diagnostic events for user audit logging (CASSANDRA-13668)
  * Allow retrieving diagnostic events via JMX (CASSANDRA-14435)
  * Add base classes for diagnostic events (CASSANDRA-13457)
  * Clear view system metadata when dropping keyspace (CASSANDRA-14646)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d8c45192/src/java/org/apache/cassandra/audit/AuditEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditEvent.java 
b/src/java/org/apache/cassandra/audit/AuditEvent.java
new file mode 100644
index 0000000..b21fe58
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/AuditEvent.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.diag.DiagnosticEvent;
+import org.apache.cassandra.diag.DiagnosticEventService;
+
+/**
+ * {@Link AuditLogEntry} wrapper to expose audit events as {@link 
DiagnosticEvent}s.
+ */
+public final class AuditEvent extends DiagnosticEvent
+{
+    private final AuditLogEntry entry;
+
+    private AuditEvent(AuditLogEntry entry)
+    {
+        this.entry = entry;
+    }
+
+    static void create(AuditLogEntry entry)
+    {
+        if (isEnabled(entry.getType()))
+            DiagnosticEventService.instance().publish(new AuditEvent(entry));
+    }
+
+    private static boolean isEnabled(AuditLogEntryType type)
+    {
+        return DiagnosticEventService.instance().isEnabled(AuditEvent.class, 
type);
+    }
+
+    public Enum<?> getType()
+    {
+        return entry.getType();
+    }
+
+    public String getSource()
+    {
+        return entry.getSource().toString(true);
+    }
+
+    public AuditLogEntry getEntry()
+    {
+        return entry;
+    }
+
+    public Map<String, Serializable> toMap()
+    {
+        HashMap<String, Serializable> ret = new HashMap<>();
+        if (entry.getKeyspace() != null) ret.put("keyspace", 
entry.getKeyspace());
+        if (entry.getOperation() != null) ret.put("operation", 
entry.getOperation());
+        if (entry.getScope() != null) ret.put("scope", entry.getScope());
+        if (entry.getUser() != null) ret.put("user", entry.getUser());
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d8c45192/src/java/org/apache/cassandra/audit/DiagnosticEventAuditLogger.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/audit/DiagnosticEventAuditLogger.java 
b/src/java/org/apache/cassandra/audit/DiagnosticEventAuditLogger.java
new file mode 100644
index 0000000..9d586ba
--- /dev/null
+++ b/src/java/org/apache/cassandra/audit/DiagnosticEventAuditLogger.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.audit;
+
+import org.apache.cassandra.diag.DiagnosticEventService;
+
+public class DiagnosticEventAuditLogger implements IAuditLogger
+{
+    public void log(AuditLogEntry logMessage)
+    {
+        AuditEvent.create(logMessage);
+    }
+
+    public boolean enabled()
+    {
+        return DiagnosticEventService.instance().isDiagnosticsEnabled();
+    }
+
+    public void stop()
+    {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d8c45192/test/unit/org/apache/cassandra/transport/CQLUserAuditTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/CQLUserAuditTest.java 
b/test/unit/org/apache/cassandra/transport/CQLUserAuditTest.java
new file mode 100644
index 0000000..82becc7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/CQLUserAuditTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.audit.AuditEvent;
+import org.apache.cassandra.audit.AuditLogEntryType;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.OverrideConfigurationLoader;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.diag.DiagnosticEventService;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class CQLUserAuditTest
+{
+    private static EmbeddedCassandraService embedded;
+    private static final BlockingQueue<AuditEvent> auditEvents = new 
LinkedBlockingQueue<>();
+
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        OverrideConfigurationLoader.override((config) -> {
+            config.authenticator = "PasswordAuthenticator";
+            config.role_manager = "CassandraRoleManager";
+            config.diagnostic_events_enabled = true;
+            config.audit_logging_options.enabled = true;
+            config.audit_logging_options.logger = "DiagnosticEventAuditLogger";
+        });
+        CQLTester.prepareServer();
+
+        System.setProperty("cassandra.superuser_setup_delay_ms", "0");
+        embedded = new EmbeddedCassandraService();
+        embedded.start();
+
+        executeAs(Arrays.asList("CREATE ROLE testuser WITH LOGIN = true AND 
SUPERUSER = false AND PASSWORD = 'foo'",
+                                "CREATE ROLE testuser_nologin WITH LOGIN = 
false AND SUPERUSER = false AND PASSWORD = 'foo'",
+                                "CREATE KEYSPACE testks WITH replication = 
{'class': 'SimpleStrategy', 'replication_factor': '1'}",
+                                "CREATE TABLE testks.table1 (a text, b int, c 
int, PRIMARY KEY (a, b))",
+                                "CREATE TABLE testks.table2 (a text, b int, c 
int, PRIMARY KEY (a, b))"),
+                  "cassandra", "cassandra", null);
+
+        DiagnosticEventService.instance().subscribe(AuditEvent.class, 
auditEvents::add);
+    }
+
+    @AfterClass
+    public static void shutdown()
+    {
+        embedded.stop();
+    }
+
+    @After
+    public void clearQueue()
+    {
+        auditEvents.clear();
+    }
+
+    @Test
+    public void loginWrongPasswordTest() throws Throwable
+    {
+        executeAs(Collections.emptyList(), "testuser", "wrongpassword", 
AuditLogEntryType.LOGIN_ERROR);
+    }
+
+    @Test
+    public void loginWrongUsernameTest() throws Throwable
+    {
+        executeAs(Collections.emptyList(), "wronguser", "foo", 
AuditLogEntryType.LOGIN_ERROR);
+    }
+
+    @Test
+    public void loginDeniedTest() throws Throwable
+    {
+        executeAs(Collections.emptyList(), "testuser_nologin", "foo", 
AuditLogEntryType.LOGIN_ERROR);
+    }
+
+    @Test
+    public void loginSuccessfulTest() throws Throwable
+    {
+        executeAs(Collections.emptyList(), "testuser", "foo", 
AuditLogEntryType.LOGIN_SUCCESS);
+    }
+
+    @Test
+    public void querySimpleSelect() throws Throwable
+    {
+        ArrayList<AuditEvent> events = executeAs(Arrays.asList("SELECT * FROM 
testks.table1"),
+                                                 "testuser", "foo", 
AuditLogEntryType.LOGIN_SUCCESS);
+        assertEquals(1, events.size());
+        AuditEvent e = events.get(0);
+        Map<String, Serializable> m = e.toMap();
+        assertEquals("testuser", m.get("user"));
+        assertEquals("SELECT * FROM testks.table1", m.get("operation"));
+        assertEquals("testks", m.get("keyspace"));
+        assertEquals("table1", m.get("scope"));
+        assertEquals(AuditLogEntryType.SELECT, e.getType());
+    }
+
+    @Test
+    public void queryInsert() throws Throwable
+    {
+        ArrayList<AuditEvent> events = executeAs(Arrays.asList("INSERT INTO 
testks.table1 (a, b, c) VALUES ('a', 1, 1)"),
+                                                 "testuser", "foo", 
AuditLogEntryType.LOGIN_SUCCESS);
+        assertEquals(1, events.size());
+        AuditEvent e = events.get(0);
+        Map<String, Serializable> m = e.toMap();
+        assertEquals("testuser", m.get("user"));
+        assertEquals("INSERT INTO testks.table1 (a, b, c) VALUES ('a', 1, 1)", 
m.get("operation"));
+        assertEquals("testks", m.get("keyspace"));
+        assertEquals("table1", m.get("scope"));
+        assertEquals(AuditLogEntryType.UPDATE, e.getType());
+    }
+
+    @Test
+    public void queryBatch() throws Throwable
+    {
+        String query = "BEGIN BATCH "
+                       + "INSERT INTO testks.table1 (a, b, c) VALUES ('a', 1, 
1); "
+                       + "INSERT INTO testks.table1 (a, b, c) VALUES ('b', 1, 
1); "
+                       + "INSERT INTO testks.table1 (a, b, c) VALUES ('b', 2, 
2); "
+                       + "APPLY BATCH;";
+        ArrayList<AuditEvent> events = executeAs(Arrays.asList(query),
+                                                 "testuser", "foo",
+                                                 
AuditLogEntryType.LOGIN_SUCCESS);
+        assertEquals(1, events.size());
+        AuditEvent e = events.get(0);
+        Map<String, Serializable> m = e.toMap();
+        assertEquals("testuser", m.get("user"));
+        assertEquals(query, m.get("operation"));
+        assertEquals(AuditLogEntryType.BATCH, e.getType());
+    }
+
+    @Test
+    public void prepareStmt()
+    {
+        Cluster cluster = 
Cluster.builder().addContactPoints(InetAddress.getLoopbackAddress())
+                                 .withoutJMXReporting()
+                                 .withCredentials("testuser", "foo")
+                                 
.withPort(DatabaseDescriptor.getNativeTransportPort()).build();
+        String spStmt = "INSERT INTO testks.table1 (a, b, c) VALUES (?, ?, ?)";
+        try (Session session = cluster.connect())
+        {
+            PreparedStatement pStmt = session.prepare(spStmt);
+            session.execute(pStmt.bind("x", 9, 8));
+        }
+
+        List<AuditEvent> events = auditEvents.stream().filter((e) -> 
e.getType() != AuditLogEntryType.LOGIN_SUCCESS)
+                                             .collect(Collectors.toList());
+        AuditEvent e = events.get(0);
+        Map<String, Serializable> m = e.toMap();
+        assertEquals(2, events.size());
+        assertEquals("testuser", m.get("user"));
+        assertEquals(spStmt, m.get("operation"));
+        assertEquals("testks", m.get("keyspace"));
+        assertEquals("table1", m.get("scope"));
+        assertEquals(AuditLogEntryType.PREPARE_STATEMENT, e.getType());
+
+        e = events.get(1);
+        m = e.toMap();
+        assertEquals("testuser", m.get("user"));
+        assertEquals(spStmt, m.get("operation"));
+        assertEquals("testks", m.get("keyspace"));
+        assertEquals("table1", m.get("scope"));
+        assertEquals(AuditLogEntryType.UPDATE, e.getType());
+
+    }
+
+    private static ArrayList<AuditEvent> executeAs(List<String> queries, 
String username, String password,
+                                                   AuditLogEntryType 
expectedAuthType) throws Exception
+    {
+        boolean authFailed = false;
+        Cluster cluster = 
Cluster.builder().addContactPoints(InetAddress.getLoopbackAddress())
+                                 .withoutJMXReporting()
+                                 .withCredentials(username, password)
+                                 
.withPort(DatabaseDescriptor.getNativeTransportPort()).build();
+        try (Session session = cluster.connect())
+        {
+            for (String query : queries)
+                session.execute(query);
+        }
+        catch (AuthenticationException e)
+        {
+            authFailed = true;
+        }
+        cluster.close();
+
+        if (expectedAuthType == null) return null;
+
+        AuditEvent event = auditEvents.poll(100, TimeUnit.MILLISECONDS);
+        assertEquals(expectedAuthType, event.getType());
+        assertTrue(!authFailed || event.getType() == 
AuditLogEntryType.LOGIN_ERROR);
+        assertEquals(InetAddressAndPort.getLoopbackAddress().address,
+                     event.getEntry().getSource().address);
+        assertTrue(event.getEntry().getSource().port > 0);
+        if (event.getType() != AuditLogEntryType.LOGIN_ERROR)
+            assertEquals(username, event.toMap().get("user"));
+
+        // drain all remaining login related events, as there's no 
specification how connections and login attempts
+        // should be handled by the driver, so we can't assert a fixed number 
of login events
+        for (AuditEvent e = auditEvents.peek();
+             e != null && (e.getType() == AuditLogEntryType.LOGIN_ERROR
+                           || e.getType() == AuditLogEntryType.LOGIN_SUCCESS);
+             e = auditEvents.peek())
+        {
+            auditEvents.remove(e);
+        }
+
+        ArrayList<AuditEvent> ret = new ArrayList<>(auditEvents.size());
+        auditEvents.drainTo(ret);
+        return ret;
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to