Add diagnostic events for user audit logging patch by Stefan Podkowinski; reviewed by Mick Semb Wever for CASSANDRA-13668
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d8c45192 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d8c45192 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d8c45192 Branch: refs/heads/trunk Commit: d8c451923185841ca28e8cb1177b71edafbfd988 Parents: a79e590 Author: Stefan Podkowinski <stefan.podkowin...@1und1.de> Authored: Fri Apr 6 09:49:38 2018 +0200 Committer: Stefan Podkowinski <stefan.podkowin...@1und1.de> Committed: Fri Aug 17 14:08:37 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/audit/AuditEvent.java | 75 ++++++ .../audit/DiagnosticEventAuditLogger.java | 39 +++ .../cassandra/transport/CQLUserAuditTest.java | 253 +++++++++++++++++++ 4 files changed, 368 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d8c45192/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 097e7dd..d2970a4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add diagnostic events for user audit logging (CASSANDRA-13668) * Allow retrieving diagnostic events via JMX (CASSANDRA-14435) * Add base classes for diagnostic events (CASSANDRA-13457) * Clear view system metadata when dropping keyspace (CASSANDRA-14646) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d8c45192/src/java/org/apache/cassandra/audit/AuditEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/audit/AuditEvent.java b/src/java/org/apache/cassandra/audit/AuditEvent.java new file mode 100644 index 0000000..b21fe58 --- /dev/null +++ b/src/java/org/apache/cassandra/audit/AuditEvent.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.audit; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.cassandra.diag.DiagnosticEvent; +import org.apache.cassandra.diag.DiagnosticEventService; + +/** + * {@Link AuditLogEntry} wrapper to expose audit events as {@link DiagnosticEvent}s. + */ +public final class AuditEvent extends DiagnosticEvent +{ + private final AuditLogEntry entry; + + private AuditEvent(AuditLogEntry entry) + { + this.entry = entry; + } + + static void create(AuditLogEntry entry) + { + if (isEnabled(entry.getType())) + DiagnosticEventService.instance().publish(new AuditEvent(entry)); + } + + private static boolean isEnabled(AuditLogEntryType type) + { + return DiagnosticEventService.instance().isEnabled(AuditEvent.class, type); + } + + public Enum<?> getType() + { + return entry.getType(); + } + + public String getSource() + { + return entry.getSource().toString(true); + } + + public AuditLogEntry getEntry() + { + return entry; + } + + public Map<String, Serializable> toMap() + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (entry.getKeyspace() != null) ret.put("keyspace", entry.getKeyspace()); + if (entry.getOperation() != null) ret.put("operation", entry.getOperation()); + if (entry.getScope() != null) ret.put("scope", entry.getScope()); + if (entry.getUser() != null) ret.put("user", entry.getUser()); + return ret; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d8c45192/src/java/org/apache/cassandra/audit/DiagnosticEventAuditLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/audit/DiagnosticEventAuditLogger.java b/src/java/org/apache/cassandra/audit/DiagnosticEventAuditLogger.java new file mode 100644 index 0000000..9d586ba --- /dev/null +++ b/src/java/org/apache/cassandra/audit/DiagnosticEventAuditLogger.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.audit; + +import org.apache.cassandra.diag.DiagnosticEventService; + +public class DiagnosticEventAuditLogger implements IAuditLogger +{ + public void log(AuditLogEntry logMessage) + { + AuditEvent.create(logMessage); + } + + public boolean enabled() + { + return DiagnosticEventService.instance().isDiagnosticsEnabled(); + } + + public void stop() + { + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d8c45192/test/unit/org/apache/cassandra/transport/CQLUserAuditTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/CQLUserAuditTest.java b/test/unit/org/apache/cassandra/transport/CQLUserAuditTest.java new file mode 100644 index 0000000..82becc7 --- /dev/null +++ b/test/unit/org/apache/cassandra/transport/CQLUserAuditTest.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import java.io.Serializable; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.audit.AuditEvent; +import org.apache.cassandra.audit.AuditLogEntryType; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.OverrideConfigurationLoader; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.diag.DiagnosticEventService; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.EmbeddedCassandraService; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +@RunWith(OrderedJUnit4ClassRunner.class) +public class CQLUserAuditTest +{ + private static EmbeddedCassandraService embedded; + private static final BlockingQueue<AuditEvent> auditEvents = new LinkedBlockingQueue<>(); + + @BeforeClass + public static void setup() throws Exception + { + OverrideConfigurationLoader.override((config) -> { + config.authenticator = "PasswordAuthenticator"; + config.role_manager = "CassandraRoleManager"; + config.diagnostic_events_enabled = true; + config.audit_logging_options.enabled = true; + config.audit_logging_options.logger = "DiagnosticEventAuditLogger"; + }); + CQLTester.prepareServer(); + + System.setProperty("cassandra.superuser_setup_delay_ms", "0"); + embedded = new EmbeddedCassandraService(); + embedded.start(); + + executeAs(Arrays.asList("CREATE ROLE testuser WITH LOGIN = true AND SUPERUSER = false AND PASSWORD = 'foo'", + "CREATE ROLE testuser_nologin WITH LOGIN = false AND SUPERUSER = false AND PASSWORD = 'foo'", + "CREATE KEYSPACE testks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", + "CREATE TABLE testks.table1 (a text, b int, c int, PRIMARY KEY (a, b))", + "CREATE TABLE testks.table2 (a text, b int, c int, PRIMARY KEY (a, b))"), + "cassandra", "cassandra", null); + + DiagnosticEventService.instance().subscribe(AuditEvent.class, auditEvents::add); + } + + @AfterClass + public static void shutdown() + { + embedded.stop(); + } + + @After + public void clearQueue() + { + auditEvents.clear(); + } + + @Test + public void loginWrongPasswordTest() throws Throwable + { + executeAs(Collections.emptyList(), "testuser", "wrongpassword", AuditLogEntryType.LOGIN_ERROR); + } + + @Test + public void loginWrongUsernameTest() throws Throwable + { + executeAs(Collections.emptyList(), "wronguser", "foo", AuditLogEntryType.LOGIN_ERROR); + } + + @Test + public void loginDeniedTest() throws Throwable + { + executeAs(Collections.emptyList(), "testuser_nologin", "foo", AuditLogEntryType.LOGIN_ERROR); + } + + @Test + public void loginSuccessfulTest() throws Throwable + { + executeAs(Collections.emptyList(), "testuser", "foo", AuditLogEntryType.LOGIN_SUCCESS); + } + + @Test + public void querySimpleSelect() throws Throwable + { + ArrayList<AuditEvent> events = executeAs(Arrays.asList("SELECT * FROM testks.table1"), + "testuser", "foo", AuditLogEntryType.LOGIN_SUCCESS); + assertEquals(1, events.size()); + AuditEvent e = events.get(0); + Map<String, Serializable> m = e.toMap(); + assertEquals("testuser", m.get("user")); + assertEquals("SELECT * FROM testks.table1", m.get("operation")); + assertEquals("testks", m.get("keyspace")); + assertEquals("table1", m.get("scope")); + assertEquals(AuditLogEntryType.SELECT, e.getType()); + } + + @Test + public void queryInsert() throws Throwable + { + ArrayList<AuditEvent> events = executeAs(Arrays.asList("INSERT INTO testks.table1 (a, b, c) VALUES ('a', 1, 1)"), + "testuser", "foo", AuditLogEntryType.LOGIN_SUCCESS); + assertEquals(1, events.size()); + AuditEvent e = events.get(0); + Map<String, Serializable> m = e.toMap(); + assertEquals("testuser", m.get("user")); + assertEquals("INSERT INTO testks.table1 (a, b, c) VALUES ('a', 1, 1)", m.get("operation")); + assertEquals("testks", m.get("keyspace")); + assertEquals("table1", m.get("scope")); + assertEquals(AuditLogEntryType.UPDATE, e.getType()); + } + + @Test + public void queryBatch() throws Throwable + { + String query = "BEGIN BATCH " + + "INSERT INTO testks.table1 (a, b, c) VALUES ('a', 1, 1); " + + "INSERT INTO testks.table1 (a, b, c) VALUES ('b', 1, 1); " + + "INSERT INTO testks.table1 (a, b, c) VALUES ('b', 2, 2); " + + "APPLY BATCH;"; + ArrayList<AuditEvent> events = executeAs(Arrays.asList(query), + "testuser", "foo", + AuditLogEntryType.LOGIN_SUCCESS); + assertEquals(1, events.size()); + AuditEvent e = events.get(0); + Map<String, Serializable> m = e.toMap(); + assertEquals("testuser", m.get("user")); + assertEquals(query, m.get("operation")); + assertEquals(AuditLogEntryType.BATCH, e.getType()); + } + + @Test + public void prepareStmt() + { + Cluster cluster = Cluster.builder().addContactPoints(InetAddress.getLoopbackAddress()) + .withoutJMXReporting() + .withCredentials("testuser", "foo") + .withPort(DatabaseDescriptor.getNativeTransportPort()).build(); + String spStmt = "INSERT INTO testks.table1 (a, b, c) VALUES (?, ?, ?)"; + try (Session session = cluster.connect()) + { + PreparedStatement pStmt = session.prepare(spStmt); + session.execute(pStmt.bind("x", 9, 8)); + } + + List<AuditEvent> events = auditEvents.stream().filter((e) -> e.getType() != AuditLogEntryType.LOGIN_SUCCESS) + .collect(Collectors.toList()); + AuditEvent e = events.get(0); + Map<String, Serializable> m = e.toMap(); + assertEquals(2, events.size()); + assertEquals("testuser", m.get("user")); + assertEquals(spStmt, m.get("operation")); + assertEquals("testks", m.get("keyspace")); + assertEquals("table1", m.get("scope")); + assertEquals(AuditLogEntryType.PREPARE_STATEMENT, e.getType()); + + e = events.get(1); + m = e.toMap(); + assertEquals("testuser", m.get("user")); + assertEquals(spStmt, m.get("operation")); + assertEquals("testks", m.get("keyspace")); + assertEquals("table1", m.get("scope")); + assertEquals(AuditLogEntryType.UPDATE, e.getType()); + + } + + private static ArrayList<AuditEvent> executeAs(List<String> queries, String username, String password, + AuditLogEntryType expectedAuthType) throws Exception + { + boolean authFailed = false; + Cluster cluster = Cluster.builder().addContactPoints(InetAddress.getLoopbackAddress()) + .withoutJMXReporting() + .withCredentials(username, password) + .withPort(DatabaseDescriptor.getNativeTransportPort()).build(); + try (Session session = cluster.connect()) + { + for (String query : queries) + session.execute(query); + } + catch (AuthenticationException e) + { + authFailed = true; + } + cluster.close(); + + if (expectedAuthType == null) return null; + + AuditEvent event = auditEvents.poll(100, TimeUnit.MILLISECONDS); + assertEquals(expectedAuthType, event.getType()); + assertTrue(!authFailed || event.getType() == AuditLogEntryType.LOGIN_ERROR); + assertEquals(InetAddressAndPort.getLoopbackAddress().address, + event.getEntry().getSource().address); + assertTrue(event.getEntry().getSource().port > 0); + if (event.getType() != AuditLogEntryType.LOGIN_ERROR) + assertEquals(username, event.toMap().get("user")); + + // drain all remaining login related events, as there's no specification how connections and login attempts + // should be handled by the driver, so we can't assert a fixed number of login events + for (AuditEvent e = auditEvents.peek(); + e != null && (e.getType() == AuditLogEntryType.LOGIN_ERROR + || e.getType() == AuditLogEntryType.LOGIN_SUCCESS); + e = auditEvents.peek()) + { + auditEvents.remove(e); + } + + ArrayList<AuditEvent> ret = new ArrayList<>(auditEvents.size()); + auditEvents.drainTo(ret); + return ret; + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org