Fix CQLSSTableWriter throwing exception and spawning threads patch by Benjamin Lerer; reviewed by yukim for CASSANDRA-8808
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fb67c41a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fb67c41a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fb67c41a Branch: refs/heads/cassandra-2.1 Commit: fb67c41ad7faeff7a5f33e9e0bca6493a3febe89 Parents: e56d9ef Author: blerer <benjamin.le...@datastax.com> Authored: Tue Mar 3 12:00:57 2015 +0100 Committer: Yuki Morishita <yu...@apache.org> Committed: Thu Mar 5 11:41:50 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../cql3/statements/UpdateStatement.java | 23 ++++ .../cassandra/io/sstable/CQLSSTableWriter.java | 50 ++++++-- .../io/sstable/CQLSSTableWriterClientTest.java | 116 +++++++++++++++++++ .../io/sstable/CQLSSTableWriterTest.java | 31 +++-- 5 files changed, 198 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4e34c9e..faa14d5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -27,6 +27,8 @@ * 'nodetool info' prints exception against older node (CASSANDRA-8796) * Ensure SSTableSimpleUnsortedWriter.close() terminates if disk writer has crashed (CASSANDRA-8807) + * Fix CQLSSTableWriter throwing exception and spawning threads + (CASSANDRA-8808) 2.0.12: * Use more efficient slice size for querying internal secondary http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index 594b5db..f34edaf 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -50,6 +50,15 @@ public class UpdateStatement extends ModificationStatement public void addUpdateForKey(ColumnFamily cf, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException { + addUpdateForKey(cf, key, builder, params, true); + } + + public void addUpdateForKey(ColumnFamily cf, + ByteBuffer key, + ColumnNameBuilder builder, + UpdateParameters params, + boolean validateIndexedColumns) throws InvalidRequestException + { CFDefinition cfDef = cfm.getCfDef(); if (builder.getLength() > FBUtilities.MAX_UNSIGNED_SHORT) @@ -106,6 +115,20 @@ public class UpdateStatement extends ModificationStatement update.execute(key, cf, builder.copy(), params); } + // validateIndexedColumns trigger a call to Keyspace.open() which we want to be able to avoid in some case + //(e.g. when using CQLSSTableWriter) + if (validateIndexedColumns) + validateIndexedColumns(cf); + } + + /** + * Checks that the value of the indexed columns is valid. + * + * @param cf the column family + * @throws InvalidRequestException if one of the values is invalid + */ + private void validateIndexedColumns(ColumnFamily cf) throws InvalidRequestException + { SecondaryIndexManager indexManager = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId).indexManager; if (indexManager.hasIndexes()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 8006112..fb4c186 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -37,9 +37,9 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestValidationException; -import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.utils.Allocator; @@ -77,6 +77,11 @@ import org.apache.cassandra.utils.Pair; */ public class CQLSSTableWriter implements Closeable { + static + { + Config.setClientMode(true); + } + private final AbstractSSTableSimpleWriter writer; private final UpdateStatement insert; private final List<ColumnSpecification> boundNames; @@ -215,7 +220,7 @@ public class CQLSSTableWriter implements Closeable { if (writer.currentKey() == null || !key.equals(writer.currentKey().key)) writer.newRow(key); - insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params); + insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params, false); } return this; } @@ -345,19 +350,11 @@ public class CQLSSTableWriter implements Closeable KSMetaData ksm = Schema.instance.getKSMetaData(this.schema.ksName); if (ksm == null) { - ksm = KSMetaData.newKeyspace(this.schema.ksName, - AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"), - ImmutableMap.of("replication_factor", "1"), - true, - Collections.singleton(this.schema)); - Schema.instance.load(ksm); + createKeyspaceWithColumnFamily(this.schema); } else if (Schema.instance.getCFMetaData(this.schema.ksName, this.schema.cfName) == null) { - Schema.instance.load(this.schema); - ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(this.schema))); - Schema.instance.setKeyspaceDefinition(ksm); - Keyspace.open(ksm.name).initCf(this.schema.cfId, this.schema.cfName, false); + addColumnFamilyToKeyspace(ksm, this.schema); } return this; } @@ -369,6 +366,35 @@ public class CQLSSTableWriter implements Closeable } /** + * Adds the specified column family to the specified keyspace. + * + * @param ksm the keyspace meta data + * @param cfm the column family meta data + */ + private static void addColumnFamilyToKeyspace(KSMetaData ksm, CFMetaData cfm) + { + ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(cfm))); + Schema.instance.load(cfm); + Schema.instance.setKeyspaceDefinition(ksm); + } + + /** + * Creates a keyspace for the specified column family. + * + * @param cfm the column family + * @throws ConfigurationException if a problem occurs while creating the keyspace. + */ + private static void createKeyspaceWithColumnFamily(CFMetaData cfm) throws ConfigurationException + { + KSMetaData ksm = KSMetaData.newKeyspace(cfm.ksName, + AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"), + ImmutableMap.of("replication_factor", "1"), + true, + Collections.singleton(cfm)); + Schema.instance.load(ksm); + } + + /** * The partitioner to use. * <p> * By default, {@code Murmur3Partitioner} will be used. If this is not the partitioner used http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java new file mode 100644 index 0000000..d10c9fb --- /dev/null +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.io.sstable; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.Arrays; + +import com.google.common.io.Files; + +import org.junit.*; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.io.util.FileUtils; + +import static org.junit.Assert.assertEquals; + +import static org.junit.Assert.assertTrue; + +public class CQLSSTableWriterClientTest +{ + private File testDirectory; + + @Before + public void setUp() + { + this.testDirectory = Files.createTempDir(); + } + + @After + public void tearDown() + { + FileUtils.deleteRecursive(this.testDirectory); + } + + @AfterClass + public static void cleanup() throws Exception + { + Config.setClientMode(false); + } + + @Test + public void testWriterInClientMode() throws IOException, InvalidRequestException + { + final String TABLE1 = "table1"; + final String TABLE2 = "table2"; + + String schema = "CREATE TABLE client_test.%s (" + + " k int PRIMARY KEY," + + " v1 text," + + " v2 int" + + ")"; + String insert = "INSERT INTO client_test.%s (k, v1, v2) VALUES (?, ?, ?)"; + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(this.testDirectory) + .forTable(String.format(schema, TABLE1)) + .using(String.format(insert, TABLE1)).build(); + + CQLSSTableWriter writer2 = CQLSSTableWriter.builder() + .inDirectory(this.testDirectory) + .forTable(String.format(schema, TABLE2)) + .using(String.format(insert, TABLE2)).build(); + + writer.addRow(0, "A", 0); + writer2.addRow(0, "A", 0); + writer.addRow(1, "B", 1); + writer2.addRow(1, "B", 1); + writer.close(); + writer2.close(); + + assertContainsDataFiles(this.testDirectory, "client_test-table1", "client_test-table2"); + } + + /** + * Checks that the specified directory contains the files with the specified prefixes. + * + * @param directory the directory containing the data files + * @param prefixes the file prefixes + */ + private static void assertContainsDataFiles(File directory, String... prefixes) + { + FilenameFilter filter = new FilenameFilter() + { + @Override + public boolean accept(File dir, String name) + { + return name.endsWith("-Data.db"); + } + }; + + File[] dataFiles = directory.listFiles(filter); + Arrays.sort(dataFiles); + + assertEquals(dataFiles.length, prefixes.length); + for (int i = 0; i < dataFiles.length; i++) + assertTrue(dataFiles[i].toString().contains(prefixes[i])); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fb67c41a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index 0f123a4..0922502 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@ -25,24 +25,25 @@ import java.util.Iterator; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; + +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.cql3.*; -import org.apache.cassandra.db.*; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.OutputHandler; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + public class CQLSSTableWriterTest { @BeforeClass @@ -51,6 +52,12 @@ public class CQLSSTableWriterTest StorageService.instance.initServer(); } + @AfterClass + public static void tearDown() + { + Config.setClientMode(false); + } + @Test public void testUnsortedWriter() throws Exception { @@ -176,12 +183,12 @@ public class CQLSSTableWriterTest @Override public void run() { - String schema = "CREATE TABLE cql_keyspace.table2 (" + String schema = "CREATE TABLE cql_keyspace2.table2 (" + " k int," + " v int," + " PRIMARY KEY (k, v)" + ")"; - String insert = "INSERT INTO cql_keyspace.table2 (k, v) VALUES (?, ?)"; + String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES (?, ?)"; CQLSSTableWriter writer = CQLSSTableWriter.builder() .inDirectory(dataDir) .forTable(schema) @@ -206,7 +213,7 @@ public class CQLSSTableWriterTest @Test public void testConcurrentWriters() throws Exception { - String KS = "cql_keyspace"; + String KS = "cql_keyspace2"; String TABLE = "table2"; File tempdir = Files.createTempDir(); @@ -235,7 +242,7 @@ public class CQLSSTableWriterTest { public void init(String keyspace) { - for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace")) + for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace2")) addRangeForEndpoint(range, FBUtilities.getBroadcastAddress()); setPartitioner(StorageService.getPartitioner()); } @@ -248,7 +255,7 @@ public class CQLSSTableWriterTest loader.stream().get(); - UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM cql_keyspace.table2;"); + UntypedResultSet rs = QueryProcessor.processInternal("SELECT * FROM cql_keyspace2.table2;"); assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size()); } }