Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0fe82be8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0fe82be8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0fe82be8 Branch: refs/heads/cassandra-3.11 Commit: 0fe82be83cceceb12172d63913388678253413bc Parents: e9b7a0f 66f1aaf Author: Yuki Morishita <yu...@apache.org> Authored: Tue Dec 13 15:55:34 2016 -0800 Committer: Yuki Morishita <yu...@apache.org> Committed: Tue Dec 13 15:55:34 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/config/DatabaseDescriptor.java | 12 +++++++ .../apache/cassandra/db/ColumnFamilyStore.java | 10 ++++-- .../cassandra/db/PartitionRangeReadCommand.java | 3 +- .../cassandra/db/compaction/CompactionTask.java | 18 +++++++---- .../cassandra/db/lifecycle/LogTransaction.java | 3 +- .../apache/cassandra/db/lifecycle/Tracker.java | 34 ++++++++++++-------- .../cassandra/index/SecondaryIndexManager.java | 4 ++- .../io/sstable/format/SSTableReader.java | 2 +- .../cassandra/service/CassandraDaemon.java | 1 + .../service/EmbeddedCassandraService.java | 2 ++ .../config/DatabaseDescriptorTest.java | 6 ++++ .../org/apache/cassandra/cql3/CQLTester.java | 1 + .../apache/cassandra/db/SystemKeyspaceTest.java | 2 ++ .../db/context/CounterContextTest.java | 8 +++++ .../db/lifecycle/LifecycleTransactionTest.java | 5 ++- .../cassandra/db/lifecycle/TrackerTest.java | 7 ++-- .../cassandra/dht/StreamStateStoreTest.java | 7 ++++ .../cassandra/gms/FailureDetectorTest.java | 2 ++ .../org/apache/cassandra/gms/GossiperTest.java | 5 +++ .../io/sstable/CQLSSTableWriterTest.java | 2 ++ .../cassandra/locator/CloudstackSnitchTest.java | 2 ++ .../apache/cassandra/locator/EC2SnitchTest.java | 2 ++ .../locator/GoogleCloudSnitchTest.java | 2 ++ .../metrics/HintedHandOffMetricsTest.java | 7 ++++ .../service/StorageServiceServerTest.java | 1 + .../concurrent/AbstractTransactionalTest.java | 7 ++++ 27 files changed, 124 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 5621c93,8cff097..145afb9 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,18 -1,5 +1,19 @@@ -2.2.9 +3.0.11 + * Mark MVs as built after successful bootstrap (CASSANDRA-12984) + * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040) + * Nodetool compactionstats fails with NullPointerException (CASSANDRA-13021) + * Thread local pools never cleaned up (CASSANDRA-13033) + * Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781) + * CQL often queries static columns unnecessarily (CASSANDRA-12768) + * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956) + * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868) + * Nodetool should use a more sane max heap size (CASSANDRA-12739) + * LocalToken ensures token values are cloned on heap (CASSANDRA-12651) + * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934) + * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535) + * Reenable HeapPool (CASSANDRA-12900) +Merged from 2.2: + * Temporarily fix bug that creates commit log when running offline tools (CASSANDRA-8616) * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796) * Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980) * Do not specify local address on outgoing connection when listen_on_broadcast_address is set (CASSANDRA-12673) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 71e1653,4bc46d0..39ed804 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -388,13 -388,17 +388,17 @@@ public class ColumnFamilyStore implemen logger.info("Initializing {}.{}", keyspace.getName(), name); - // scan for sstables corresponding to this cf and load them - data = new Tracker(this, loadSSTables); + // Create Memtable only on online + Memtable initialMemtable = null; + if (DatabaseDescriptor.isDaemonInitialized()) + initialMemtable = new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), this); + data = new Tracker(initialMemtable, loadSSTables); + // scan for sstables corresponding to this cf and load them if (data.loadsstables) { - Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true); - Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner); + Directories.SSTableLister sstableFiles = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true); + Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata); data.addInitialSSTables(sstables); } @@@ -1953,10 -2758,12 +1957,10 @@@ { public Void call() { - cfs.data.reset(); + cfs.data.reset(new Memtable(new AtomicReference<>(ReplayPosition.NONE), cfs)); - cfs.getCompactionStrategy().shutdown(); - cfs.getCompactionStrategy().startup(); return null; } - }, true); + }, true, false); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 99e24c8,0000000..17adef0 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@@ -1,322 -1,0 +1,323 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import com.google.common.collect.Iterables; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.BaseRowIterator; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.pager.*; +import org.apache.cassandra.thrift.ThriftResultsMerger; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; + +/** + * A read command that selects a (part of a) range of partitions. + */ +public class PartitionRangeReadCommand extends ReadCommand +{ + protected static final SelectionDeserializer selectionDeserializer = new Deserializer(); + + private final DataRange dataRange; + private int oldestUnrepairedTombstone = Integer.MAX_VALUE; + + public PartitionRangeReadCommand(boolean isDigest, + int digestVersion, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange, + Optional<IndexMetadata> index) + { + super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + this.dataRange = dataRange; + this.index = index; + } + + public PartitionRangeReadCommand(CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange, + Optional<IndexMetadata> index) + { + this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index); + } + + /** + * Creates a new read command that query all the data in the table. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * + * @return a newly created read command that queries everything in the table. + */ + public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec) + { + return new PartitionRangeReadCommand(metadata, + nowInSec, + ColumnFilter.all(metadata), + RowFilter.NONE, + DataLimits.NONE, + DataRange.allData(metadata.partitioner), + Optional.empty()); + } + + public DataRange dataRange() + { + return dataRange; + } + + public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key) + { + return dataRange.clusteringIndexFilter(key); + } + + public boolean isNamesQuery() + { + return dataRange.isNamesQuery(); + } + + public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range) + { + return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index); + } + + public PartitionRangeReadCommand copy() + { + return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index); + } + + public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits) + { + return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index); + } + + public long getTimeout() + { + return DatabaseDescriptor.getRangeRpcTimeout(); + } + + public boolean selectsKey(DecoratedKey key) + { + if (!dataRange().contains(key)) + return false; + + return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator()); + } + + public boolean selectsClustering(DecoratedKey key, Clustering clustering) + { + if (clustering == Clustering.STATIC_CLUSTERING) + return !columnFilter().fetchedColumns().statics.isEmpty(); + + if (!dataRange().clusteringIndexFilter(key).selects(clustering)) + return false; + return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering); + } + + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + { + return StorageProxy.getRangeSlice(this, consistency); + } + + public QueryPager getPager(PagingState pagingState, int protocolVersion) + { + return new PartitionRangeQueryPager(this, pagingState, protocolVersion); + } + + protected void recordLatency(TableMetrics metric, long latencyNanos) + { + metric.rangeLatency.addNano(latencyNanos); + } + + protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup) + { + ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange())); + Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator())); + + // fetch data from current memtable, historical memtables, and SSTables in the correct order. + final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size()); + + try + { + for (Memtable memtable : view.memtables) + { + @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method + Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift()); + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime()); + iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter); + } + + for (SSTableReader sstable : view.sstables) + { + @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method + UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), isForThrift()); + iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter); + if (!sstable.isRepaired()) + oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime()); + } - return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs); ++ return iterators.isEmpty() ? EmptyIterators.unfilteredPartition(metadata(), isForThrift()) ++ : checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs); + } + catch (RuntimeException | Error e) + { + try + { + FBUtilities.closeAll(iterators); + } + catch (Exception suppressed) + { + e.addSuppressed(suppressed); + } + + throw e; + } + } + + @Override + protected int oldestUnrepairedTombstone() + { + return oldestUnrepairedTombstone; + } + + private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs) + { + class CacheFilter extends Transformation + { + @Override + public BaseRowIterator applyToPartition(BaseRowIterator iter) + { + // Note that we rely on the fact that until we actually advance 'iter', no really costly operation is actually done + // (except for reading the partition key from the index file) due to the call to mergeLazily in queryStorage. + DecoratedKey dk = iter.partitionKey(); + + // Check if this partition is in the rowCache and if it is, if it covers our filter + CachedPartition cached = cfs.getRawCachedPartition(dk); + ClusteringIndexFilter filter = dataRange().clusteringIndexFilter(dk); + + if (cached != null && cfs.isFilterFullyCoveredBy(filter, limits(), cached, nowInSec())) + { + // We won't use 'iter' so close it now. + iter.close(); + + return filter.getUnfilteredRowIterator(columnFilter(), cached); + } + + return iter; + } + } + return Transformation.apply(iter, new CacheFilter()); + } + + public MessageOut<ReadCommand> createMessage(int version) + { + return dataRange().isPaging() + ? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, pagedRangeSerializer) + : new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, rangeSliceSerializer); + } + + protected void appendCQLWhereClause(StringBuilder sb) + { + if (dataRange.isUnrestricted() && rowFilter().isEmpty()) + return; + + sb.append(" WHERE "); + // We put the row filter first because the data range can end by "ORDER BY" + if (!rowFilter().isEmpty()) + { + sb.append(rowFilter()); + if (!dataRange.isUnrestricted()) + sb.append(" AND "); + } + if (!dataRange.isUnrestricted()) + sb.append(dataRange.toCQLString(metadata())); + } + + /** + * Allow to post-process the result of the query after it has been reconciled on the coordinator + * but before it is passed to the CQL layer to return the ResultSet. + * + * See CASSANDRA-8717 for why this exists. + */ + public PartitionIterator postReconciliationProcessing(PartitionIterator result) + { + ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName); + Index index = getIndex(cfs); + return index == null ? result : index.postProcessorFor(this).apply(result, this); + } + + @Override + public String toString() + { + return String.format("Read(%s.%s columns=%s rowfilter=%s limits=%s %s)", + metadata().ksName, + metadata().cfName, + columnFilter(), + rowFilter(), + limits(), + dataRange().toString(metadata())); + } + + protected void serializeSelection(DataOutputPlus out, int version) throws IOException + { + DataRange.serializer.serialize(dataRange(), out, version, metadata()); + } + + protected long selectionSerializedSize(int version) + { + return DataRange.serializer.serializedSize(dataRange(), version, metadata()); + } + + private static class Deserializer extends SelectionDeserializer + { + public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) + throws IOException + { + DataRange range = DataRange.serializer.deserialize(in, version, metadata); + return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range, index); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 0c4e144,20d3dc0..f0a1f47 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@@ -223,15 -218,20 +223,19 @@@ public class CompactionTask extends Abs for (SSTableReader reader : newSStables) newSSTableNames.append(reader.descriptor.baseFilename()).append(","); - double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; - Summary mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize); - logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, mergeSummary.totalSourceRows, totalKeysWritten, mergeSummary.partitionMerge)); - logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); - logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten)); - if (offline) + { Refs.release(Refs.selfRefs(newSStables)); + } + else + { + double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; - long totalSourceRows = 0; - String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize); ++ Summary mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize); + logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); ++ taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, mergeSummary.totalSourceRows, totalKeysWritten, mergeSummary.partitionMerge)); + logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); - logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten)); ++ logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten)); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java index ca644eb,0000000..350477c mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java @@@ -1,444 -1,0 +1,445 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.lifecycle; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Runnables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LogRecord.Type; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SnapshotDeletingTask; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.concurrent.Ref; +import org.apache.cassandra.utils.concurrent.RefCounted; +import org.apache.cassandra.utils.concurrent.Transactional; + +/** + * IMPORTANT: When this object is involved in a transactional graph, and is not encapsulated in a LifecycleTransaction, + * for correct behaviour its commit MUST occur before any others, since it may legitimately fail. This is consistent + * with the Transactional API, which permits one failing action to occur at the beginning of the commit phase, but also + * *requires* that the prepareToCommit() phase only take actions that can be rolled back. + * + * IMPORTANT: The transaction must complete (commit or abort) before any temporary files are deleted, even though the + * txn log file itself will not be deleted until all tracked files are deleted. This is required by FileLister to ensure + * a consistent disk state. LifecycleTransaction ensures this requirement, so this class should really never be used + * outside of LT. @see FileLister.classifyFiles(TransactionData txn) + * + * A class that tracks sstable files involved in a transaction across sstables: + * if the transaction succeeds the old files should be deleted and the new ones kept; vice-versa if it fails. + * + * The transaction log file contains new and old sstables as follows: + * + * add:[sstable-2][CRC] + * remove:[sstable-1,max_update_time,num files][CRC] + * + * where sstable-2 is a new sstable to be retained if the transaction succeeds and sstable-1 is an old sstable to be + * removed. CRC is an incremental CRC of the file content up to this point. For old sstable files we also log the + * last update time of all files for the sstable descriptor and a checksum of vital properties such as update times + * and file sizes. + * + * Upon commit we add a final line to the log file: + * + * commit:[commit_time][CRC] + * + * When the transaction log is cleaned-up by the TransactionTidier, which happens only after any old sstables have been + * osoleted, then any sstable files for old sstables are removed before deleting the transaction log if the transaction + * was committed, vice-versa if the transaction was aborted. + * + * On start-up we look for any transaction log files and repeat the cleanup process described above. + * + * See CASSANDRA-7066 for full details. + */ +class LogTransaction extends Transactional.AbstractTransactional implements Transactional +{ + private static final Logger logger = LoggerFactory.getLogger(LogTransaction.class); + + /** + * If the format of the lines in the transaction log is wrong or the checksum + * does not match, then we throw this exception. + */ + public static final class CorruptTransactionLogException extends RuntimeException + { + public final LogFile txnFile; + + public CorruptTransactionLogException(String message, LogFile txnFile) + { + super(message); + this.txnFile = txnFile; + } + } + + private final Tracker tracker; + private final LogFile txnFile; + private final Ref<LogTransaction> selfRef; + // Deleting sstables is tricky because the mmapping might not have been finalized yet, + // and delete will fail (on Windows) until it is (we only force the unmapping on SUN VMs). + // Additionally, we need to make sure to delete the data file first, so on restart the others + // will be recognized as GCable. + private static final Queue<Runnable> failedDeletions = new ConcurrentLinkedQueue<>(); + + LogTransaction(OperationType opType) + { + this(opType, null); + } + + LogTransaction(OperationType opType, Tracker tracker) + { + this.tracker = tracker; + this.txnFile = new LogFile(opType, UUIDGen.getTimeUUID()); + this.selfRef = new Ref<>(this, new TransactionTidier(txnFile)); + + if (logger.isTraceEnabled()) + logger.trace("Created transaction logs with id {}", txnFile.id()); + } + + /** + * Track a reader as new. + **/ + void trackNew(SSTable table) + { + txnFile.add(Type.ADD, table); + } + + /** + * Stop tracking a reader as new. + */ + void untrackNew(SSTable table) + { + txnFile.remove(Type.ADD, table); + } + + /** + * Schedule a reader for deletion as soon as it is fully unreferenced. + */ + SSTableTidier obsoleted(SSTableReader reader) + { + if (txnFile.contains(Type.ADD, reader)) + { + if (txnFile.contains(Type.REMOVE, reader)) + throw new IllegalArgumentException(); + + return new SSTableTidier(reader, true, this); + } + + txnFile.add(Type.REMOVE, reader); + + if (tracker != null) + tracker.notifyDeleting(reader); + + return new SSTableTidier(reader, false, this); + } + + OperationType type() + { + return txnFile.type(); + } + + UUID id() + { + return txnFile.id(); + } + + @VisibleForTesting + LogFile txnFile() + { + return txnFile; + } + + @VisibleForTesting + List<File> logFiles() + { + return txnFile.getFiles(); + } + + @VisibleForTesting + List<String> logFilePaths() + { + return txnFile.getFilePaths(); + } + + static void delete(File file) + { + try + { + if (logger.isTraceEnabled()) + logger.trace("Deleting {}", file); + + Files.delete(file.toPath()); + } + catch (NoSuchFileException e) + { + logger.error("Unable to delete {} as it does not exist", file); + } + catch (IOException e) + { + logger.error("Unable to delete {}", file, e); + throw new RuntimeException(e); + } + } + + /** + * The transaction tidier. + * + * When the transaction reference is fully released we try to delete all the obsolete files + * depending on the transaction result, as well as the transaction log file. + */ + private static class TransactionTidier implements RefCounted.Tidy, Runnable + { + private final LogFile data; + + TransactionTidier(LogFile data) + { + this.data = data; + } + + public void tidy() throws Exception + { + run(); + } + + public String name() + { + return data.toString(); + } + + public void run() + { + if (logger.isTraceEnabled()) + logger.trace("Removing files for transaction {}", name()); + + if (!data.completed()) + { // this happens if we forget to close a txn and the garbage collector closes it for us + logger.error("{} was not completed, trying to abort it now", data); + Throwable err = Throwables.perform((Throwable)null, data::abort); + if (err != null) + logger.error("Failed to abort {}", data, err); + } + + Throwable err = data.removeUnfinishedLeftovers(null); + + if (err != null) + { + logger.info("Failed deleting files for transaction {}, we'll retry after GC and on on server restart", name(), err); + failedDeletions.add(this); + } + else + { + if (logger.isTraceEnabled()) + logger.trace("Closing file transaction {}", name()); + + data.close(); + } + } + } + + static class Obsoletion + { + final SSTableReader reader; + final SSTableTidier tidier; + + Obsoletion(SSTableReader reader, SSTableTidier tidier) + { + this.reader = reader; + this.tidier = tidier; + } + } + + /** + * The SSTableReader tidier. When a reader is fully released and no longer referenced + * by any one, we run this. It keeps a reference to the parent transaction and releases + * it when done, so that the final transaction cleanup can run when all obsolete readers + * are released. + */ + public static class SSTableTidier implements Runnable + { + // must not retain a reference to the SSTableReader, else leak detection cannot kick in + private final Descriptor desc; + private final long sizeOnDisk; + private final Tracker tracker; + private final boolean wasNew; + private final Ref<LogTransaction> parentRef; + + public SSTableTidier(SSTableReader referent, boolean wasNew, LogTransaction parent) + { + this.desc = referent.descriptor; + this.sizeOnDisk = referent.bytesOnDisk(); + this.tracker = parent.tracker; + this.wasNew = wasNew; + this.parentRef = parent.selfRef.tryRef(); + } + + public void run() + { - SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation); ++ if (tracker != null && !tracker.isDummy()) ++ SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation); + + try + { + // If we can't successfully delete the DATA component, set the task to be retried later: see TransactionTidier + File datafile = new File(desc.filenameFor(Component.DATA)); + + delete(datafile); + // let the remainder be cleaned up by delete + SSTable.delete(desc, SSTable.discoverComponentsFor(desc)); + } + catch (Throwable t) + { + logger.error("Failed deletion for {}, we'll retry after GC and on server restart", desc); + failedDeletions.add(this); + return; + } + + if (tracker != null && tracker.cfstore != null && !wasNew) + tracker.cfstore.metric.totalDiskSpaceUsed.dec(sizeOnDisk); + + // release the referent to the parent so that the all transaction files can be released + parentRef.release(); + } + + public void abort() + { + parentRef.release(); + } + } + + + static void rescheduleFailedDeletions() + { + Runnable task; + while ( null != (task = failedDeletions.poll())) + ScheduledExecutors.nonPeriodicTasks.submit(task); + + // On Windows, snapshots cannot be deleted so long as a segment of the root element is memory-mapped in NTFS. + SnapshotDeletingTask.rescheduleFailedTasks(); + } + + static void waitForDeletions() + { + FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(Runnables.doNothing(), 0, TimeUnit.MILLISECONDS)); + } + + @VisibleForTesting + Throwable complete(Throwable accumulate) + { + try + { + accumulate = selfRef.ensureReleased(accumulate); + return accumulate; + } + catch (Throwable t) + { + logger.error("Failed to complete file transaction {}", id(), t); + return Throwables.merge(accumulate, t); + } + } + + protected Throwable doCommit(Throwable accumulate) + { + return complete(Throwables.perform(accumulate, txnFile::commit)); + } + + protected Throwable doAbort(Throwable accumulate) + { + return complete(Throwables.perform(accumulate, txnFile::abort)); + } + + protected void doPrepare() { } + + /** + * Called on startup to scan existing folders for any unfinished leftovers of + * operations that were ongoing when the process exited. Also called by the standalone + * sstableutil tool when the cleanup option is specified, @see StandaloneSSTableUtil. + * + */ + static void removeUnfinishedLeftovers(CFMetaData metadata) + { + removeUnfinishedLeftovers(new Directories(metadata, ColumnFamilyStore.getInitialDirectories()).getCFDirectories()); + } + + @VisibleForTesting + static void removeUnfinishedLeftovers(List<File> folders) + { + LogFilesByName logFiles = new LogFilesByName(); + folders.forEach(logFiles::list); + logFiles.removeUnfinishedLeftovers(); + } + + private static final class LogFilesByName + { + Map<String, List<File>> files = new HashMap<>(); + + void list(File folder) + { + Arrays.stream(folder.listFiles(LogFile::isLogFile)).forEach(this::add); + } + + void add(File file) + { + List<File> filesByName = files.get(file.getName()); + if (filesByName == null) + { + filesByName = new ArrayList<>(); + files.put(file.getName(), filesByName); + } + + filesByName.add(file); + } + + void removeUnfinishedLeftovers() + { + files.forEach(LogFilesByName::removeUnfinishedLeftovers); + } + + static void removeUnfinishedLeftovers(String name, List<File> logFiles) + { + + try(LogFile txn = LogFile.make(name, logFiles)) + { + if (txn.verify()) + { + Throwable failure = txn.removeUnfinishedLeftovers(null); + if (failure != null) + logger.error("Failed to remove unfinished transaction leftovers for txn {}", txn, failure); + } + else + { + logger.error("Unexpected disk state: failed to read transaction txn {}", txn); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fe82be8/src/java/org/apache/cassandra/db/lifecycle/Tracker.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/Tracker.java index 5a3d524,e77ef78..9feaa3e --- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java +++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java @@@ -193,15 -204,14 +201,13 @@@ public class Tracke /** (Re)initializes the tracker, purging all references. */ @VisibleForTesting - public void reset() + public void reset(Memtable memtable) { - view.set(new View( - !isDummy() ? ImmutableList.of(new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfstore)) - : ImmutableList.<Memtable>of(), - ImmutableList.<Memtable>of(), - Collections.<SSTableReader, SSTableReader>emptyMap(), - Collections.<SSTableReader, SSTableReader>emptyMap(), - SSTableIntervalTree.empty())); - view.set(new View(memtable != null ? singletonList(memtable) : Collections.<Memtable>emptyList(), - Collections.<Memtable>emptyList(), - Collections.<SSTableReader, SSTableReader>emptyMap(), - Collections.<SSTableReader>emptySet(), - Collections.<SSTableReader>emptySet(), ++ view.set(new View(memtable != null ? singletonList(memtable) : Collections.emptyList(), ++ Collections.emptyList(), ++ Collections.emptyMap(), ++ Collections.emptyMap(), + SSTableIntervalTree.empty())); } public Throwable dropSSTablesIfInvalid(Throwable accumulate)