Repository: cassandra Updated Branches: refs/heads/cassandra-3.11 bb7e522b4 -> 9562b9b69 refs/heads/trunk 26e025804 -> 9c6f87c35
Properly evict pstmts from prepared statements cache patch by Robert Stupp; reviewed by Benjamin Lerer for CASSANDRA-13641 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9562b9b6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9562b9b6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9562b9b6 Branch: refs/heads/cassandra-3.11 Commit: 9562b9b69e08b84ec1e8e431a846548fa8a83b44 Parents: bb7e522 Author: Robert Stupp <sn...@snazy.de> Authored: Wed Jun 28 21:15:03 2017 +0200 Committer: Robert Stupp <sn...@snazy.de> Committed: Wed Jun 28 21:15:03 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cql3/QueryProcessor.java | 9 +- .../org/apache/cassandra/db/SystemKeyspace.java | 6 ++ test/conf/cassandra.yaml | 1 + .../cassandra/cql3/PstmtPersistenceTest.java | 108 ++++++++++++++----- 5 files changed, 100 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4297a15..88aa1ef 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.11.1 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641) Merged from 3.0: * Fix secondary index queries on COMPACT tables (CASSANDRA-13627) * Nodetool listsnapshots output is missing a newline, if there are no snapshots (CASSANDRA-13568) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index f5ce7e4..0e0ba3c 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -88,6 +88,7 @@ public class QueryProcessor implements QueryHandler .listener((md5Digest, prepared) -> { metrics.preparedStatementsEvicted.inc(); lastMinuteEvictionsCount.incrementAndGet(); + SystemKeyspace.removePreparedStatement(md5Digest); }).build(); thriftPreparedStatements = new ConcurrentLinkedHashMap.Builder<Integer, ParsedStatement.Prepared>() @@ -162,11 +163,17 @@ public class QueryProcessor implements QueryHandler logger.info("Preloaded {} prepared statements", count); } + /** + * Clears the prepared statement cache. + * @param memoryOnly {@code true} if only the in memory caches must be cleared, {@code false} otherwise. + */ @VisibleForTesting - public static void clearPrepraredStatements() + public static void clearPreparedStatements(boolean memoryOnly) { preparedStatements.clear(); thriftPreparedStatements.clear(); + if (!memoryOnly) + SystemKeyspace.resetPreparedStatements(); } private static QueryState internalQueryState() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 82c9752..6c45329 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -1488,6 +1488,12 @@ public final class SystemKeyspace key.byteBuffer()); } + public static void resetPreparedStatements() + { + ColumnFamilyStore availableRanges = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(PREPARED_STATEMENTS); + availableRanges.truncateBlocking(); + } + public static List<Pair<String, String>> loadPreparedStatements() { String query = String.format("SELECT logged_keyspace, query_string FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index cf02634..96ca9a0 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -44,3 +44,4 @@ row_cache_class_name: org.apache.cassandra.cache.OHCProvider row_cache_size_in_mb: 16 enable_user_defined_functions: true enable_scripted_user_defined_functions: true +prepared_statements_cache_size_mb: 1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java index 380dbda..e7adc8e 100644 --- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java +++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.junit.Before; import org.junit.Test; import junit.framework.Assert; @@ -36,16 +37,23 @@ import org.apache.cassandra.service.QueryState; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.MD5Digest; +import static org.junit.Assert.*; + public class PstmtPersistenceTest extends CQLTester { + @Before + public void setUp() + { + QueryProcessor.clearPreparedStatements(false); + } + @Test public void testCachedPreparedStatements() throws Throwable { // need this for pstmt execution/validation tests requireNetwork(); - int rows = QueryProcessor.executeOnceInternal("SELECT * FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS).size(); - Assert.assertEquals(0, rows); + assertEquals(0, numberOfStatementsOnDisk()); execute("CREATE KEYSPACE IF NOT EXISTS foo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}"); execute("CREATE TABLE foo.bar (key text PRIMARY KEY, val int)"); @@ -56,30 +64,27 @@ public class PstmtPersistenceTest extends CQLTester List<MD5Digest> stmtIds = new ArrayList<>(); // #0 - stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + SchemaConstants.SCHEMA_KEYSPACE_NAME + '.' + SchemaKeyspace.TABLES + " WHERE keyspace_name = ?", clientState, false).statementId); + stmtIds.add(prepareStatement("SELECT * FROM %s WHERE keyspace_name = ?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES, clientState)); // #1 - stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + KEYSPACE + '.' + currentTable() + " WHERE pk = ?", clientState, false).statementId); + stmtIds.add(prepareStatement("SELECT * FROM %s WHERE pk = ?", clientState)); // #2 - stmtIds.add(QueryProcessor.prepare("SELECT * FROM foo.bar WHERE key = ?", clientState, false).statementId); + stmtIds.add(prepareStatement("SELECT * FROM %s WHERE key = ?", "foo", "bar", clientState)); clientState.setKeyspace("foo"); // #3 - stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + KEYSPACE + '.' + currentTable() + " WHERE pk = ?", clientState, false).statementId); + stmtIds.add(prepareStatement("SELECT * FROM %s WHERE pk = ?", clientState)); // #4 - stmtIds.add(QueryProcessor.prepare("SELECT * FROM foo.bar WHERE key = ?", clientState, false).statementId); + stmtIds.add(prepareStatement("SELECT * FROM %S WHERE key = ?", "foo", "bar", clientState)); - Assert.assertEquals(5, stmtIds.size()); - Assert.assertEquals(5, QueryProcessor.preparedStatementsCount()); - - String queryAll = "SELECT * FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS; + assertEquals(5, stmtIds.size()); + assertEquals(5, QueryProcessor.preparedStatementsCount()); - rows = QueryProcessor.executeOnceInternal(queryAll).size(); - Assert.assertEquals(5, rows); + Assert.assertEquals(5, numberOfStatementsOnDisk()); QueryHandler handler = ClientState.getCQLQueryHandler(); validatePstmts(stmtIds, handler); // clear prepared statements cache - QueryProcessor.clearPrepraredStatements(); + QueryProcessor.clearPreparedStatements(true); Assert.assertEquals(0, QueryProcessor.preparedStatementsCount()); for (MD5Digest stmtId : stmtIds) Assert.assertNull(handler.getPrepared(stmtId)); @@ -88,7 +93,9 @@ public class PstmtPersistenceTest extends CQLTester QueryProcessor.preloadPreparedStatement(); validatePstmts(stmtIds, handler); + // validate that the prepared statements are in the system table + String queryAll = "SELECT * FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS; for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(queryAll)) { MD5Digest digest = MD5Digest.wrap(ByteBufferUtil.getArray(row.getBytes("prepared_id"))); @@ -97,22 +104,19 @@ public class PstmtPersistenceTest extends CQLTester } // add anther prepared statement and sync it to table - QueryProcessor.prepare("SELECT * FROM bar WHERE key = ?", clientState, false); - Assert.assertEquals(6, QueryProcessor.preparedStatementsCount()); - rows = QueryProcessor.executeOnceInternal(queryAll).size(); - Assert.assertEquals(6, rows); + prepareStatement("SELECT * FROM %s WHERE key = ?", "foo", "bar", clientState); + assertEquals(6, numberOfStatementsInMemory()); + assertEquals(6, numberOfStatementsOnDisk()); // drop a keyspace (prepared statements are removed - syncPreparedStatements() remove should the rows, too) execute("DROP KEYSPACE foo"); - Assert.assertEquals(3, QueryProcessor.preparedStatementsCount()); - rows = QueryProcessor.executeOnceInternal(queryAll).size(); - Assert.assertEquals(3, rows); - + assertEquals(3, numberOfStatementsInMemory()); + assertEquals(3, numberOfStatementsOnDisk()); } private void validatePstmts(List<MD5Digest> stmtIds, QueryHandler handler) { - Assert.assertEquals(5, QueryProcessor.preparedStatementsCount()); + assertEquals(5, QueryProcessor.preparedStatementsCount()); QueryOptions optionsStr = QueryOptions.forInternalCalls(Collections.singletonList(UTF8Type.instance.fromString("foobar"))); QueryOptions optionsInt = QueryOptions.forInternalCalls(Collections.singletonList(Int32Type.instance.decompose(42))); validatePstmt(handler, stmtIds.get(0), optionsStr); @@ -125,7 +129,63 @@ public class PstmtPersistenceTest extends CQLTester private static void validatePstmt(QueryHandler handler, MD5Digest stmtId, QueryOptions options) { ParsedStatement.Prepared prepared = handler.getPrepared(stmtId); - Assert.assertNotNull(prepared); + assertNotNull(prepared); handler.processPrepared(prepared.statement, QueryState.forInternalCalls(), options, Collections.emptyMap(), System.nanoTime()); } + + @Test + public void testPstmtInvalidation() throws Throwable + { + ClientState clientState = ClientState.forInternalCalls(); + + createTable("CREATE TABLE %s (key int primary key, val int)"); + + for (int cnt = 1; cnt < 10000; cnt++) + { + prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt, clientState); + + if (numberOfEvictedStatements() > 0) + { + assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk()); + + // prepare a more statements to trigger more evictions + for (int cnt2 = 1; cnt2 < 10; cnt2++) + prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING TIMESTAMP " + cnt2, clientState); + + // each new prepared statement should have caused an eviction + assertEquals("eviction count didn't increase by the expected number", numberOfEvictedStatements(), 10); + assertEquals("Number of statements in table and in cache don't match", numberOfStatementsInMemory(), numberOfStatementsOnDisk()); + + return; + } + } + + fail("Prepared statement eviction does not work"); + } + + private long numberOfStatementsOnDisk() throws Throwable + { + UntypedResultSet.Row row = execute("SELECT COUNT(*) FROM " + SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS).one(); + return row.getLong("count"); + } + + private long numberOfStatementsInMemory() + { + return QueryProcessor.preparedStatementsCount(); + } + + private long numberOfEvictedStatements() + { + return QueryProcessor.metrics.preparedStatementsEvicted.getCount(); + } + + private MD5Digest prepareStatement(String stmt, ClientState clientState) + { + return prepareStatement(stmt, keyspace(), currentTable(), clientState); + } + + private MD5Digest prepareStatement(String stmt, String keyspace, String table, ClientState clientState) + { + return QueryProcessor.prepare(String.format(stmt, keyspace + "." + table), clientState, false).statementId; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org