This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new b105e91 liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted b105e91 is described below commit b105e919678240b5f448df9acaf6c93117f0c0cc Author: David Capwell <dcapw...@gmail.com> AuthorDate: Fri Mar 27 18:14:02 2020 -0700 liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted Patch by David Capwell; reviewed by marcuse for CASSANDRA-15674 --- CHANGES.txt | 1 + .../db/lifecycle/LifecycleTransaction.java | 44 +++++++ .../io/sstable/IndexSummaryRedistribution.java | 27 +++- .../cassandra/io/sstable/format/SSTableReader.java | 6 - .../apache/cassandra/io/DiskSpaceMetricsTest.java | 139 +++++++++++++++++++++ 5 files changed, 210 insertions(+), 7 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 6e6f418..0a0a4d5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.21 + * liveDiskSpaceUsed and totalDiskSpaceUsed get corrupted if IndexSummaryRedistribution gets interrupted (CASSANDRA-15674) * Fix Debian init start/stop (CASSANDRA-15770) * Fix infinite loop on index query paging in tables with clustering (CASSANDRA-14242) * Fix chunk index overflow due to large sstable with small chunk length (CASSANDRA-15595) diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index 7ecaa38..4abce33 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@ -35,6 +35,7 @@ import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableReader.UniqueIdentifier; +import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.concurrent.Transactional; import static com.google.common.base.Functions.compose; @@ -123,6 +124,10 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im // the tidier and their readers, to be used for marking readers obsoleted during a commit private List<LogTransaction.Obsoletion> obsoletions; + // commit/rollback hooks + private List<Runnable> commitHooks = new ArrayList<>(); + private List<Runnable> abortHooks = new ArrayList<>(); + /** * construct a Transaction for use in an offline operation */ @@ -223,12 +228,14 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im accumulate = markObsolete(obsoletions, accumulate); accumulate = tracker.updateSizeTracking(logged.obsolete, logged.update, accumulate); + accumulate = runOnCommitHooks(accumulate); accumulate = release(selfRefs(logged.obsolete), accumulate); accumulate = tracker.notifySSTablesChanged(originals, logged.update, log.type(), accumulate); return accumulate; } + /** * undo all of the changes made by this transaction, resetting the state to its original form */ @@ -259,6 +266,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im accumulate = tracker.notifySSTablesChanged(invalid, restored, OperationType.COMPACTION, accumulate); // setReplaced immediately preceding versions that have not been obsoleted accumulate = setReplaced(logged.update, accumulate); + accumulate = runOnAbortooks(accumulate); // we have replaced all of logged.update and never made visible staged.update, // and the files we have logged as obsolete we clone fresh versions of, so they are no longer needed either // any _staged_ obsoletes should either be in staged.update already, and dealt with there, @@ -270,6 +278,32 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im return accumulate; } + private Throwable runOnCommitHooks(Throwable accumulate) + { + return runHooks(commitHooks, accumulate); + } + + private Throwable runOnAbortooks(Throwable accumulate) + { + return runHooks(abortHooks, accumulate); + } + + private static Throwable runHooks(Iterable<Runnable> hooks, Throwable accumulate) + { + for (Runnable hook : hooks) + { + try + { + hook.run(); + } + catch (Exception e) + { + accumulate = Throwables.merge(accumulate, e); + } + } + return accumulate; + } + @Override protected Throwable doPostCleanup(Throwable accumulate) { @@ -366,6 +400,16 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional im staged.obsolete.add(reader); } + public void runOnCommit(Runnable fn) + { + commitHooks.add(fn); + } + + public void runOnAbort(Runnable fn) + { + abortHooks.add(fn); + } + /** * obsolete every file in the original transaction */ diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java index 189ee2d..45bd7eb 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryRedistribution.java @@ -37,11 +37,11 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.compaction.CompactionInfo; import org.apache.cassandra.db.compaction.CompactionInterruptedException; -import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.compaction.CompactionInfo.Unit; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Refs; @@ -261,14 +261,39 @@ public class IndexSummaryRedistribution extends CompactionInfo.Holder sstable, sstable.getIndexSummarySamplingLevel(), Downsampling.BASE_SAMPLING_LEVEL, entry.newSamplingLevel, Downsampling.BASE_SAMPLING_LEVEL); ColumnFamilyStore cfs = Keyspace.open(sstable.metadata.ksName).getColumnFamilyStore(sstable.metadata.cfId); + long oldSize = sstable.bytesOnDisk(); SSTableReader replacement = sstable.cloneWithNewSummarySamplingLevel(cfs, entry.newSamplingLevel); + long newSize = replacement.bytesOnDisk(); newSSTables.add(replacement); transactions.get(sstable.metadata.cfId).update(replacement, true); + addHooks(cfs, transactions, oldSize, newSize); } return newSSTables; } + /** + * Add hooks to correctly update the storage load metrics once the transaction is closed/aborted + */ + @SuppressWarnings("resource") // Transactions are closed in finally outside of this method + private void addHooks(ColumnFamilyStore cfs, Map<UUID, LifecycleTransaction> transactions, long oldSize, long newSize) + { + LifecycleTransaction txn = transactions.get(cfs.metadata.cfId); + txn.runOnCommit(() -> { + // The new size will be added in Transactional.commit() as an updated SSTable, more details: CASSANDRA-13738 + StorageMetrics.load.dec(oldSize); + cfs.metric.liveDiskSpaceUsed.dec(oldSize); + cfs.metric.totalDiskSpaceUsed.dec(oldSize); + }); + txn.runOnAbort(() -> { + // the local disk was modified but book keeping couldn't be commited, apply the delta + long delta = oldSize - newSize; // if new is larger this will be negative, so dec will become a inc + StorageMetrics.load.dec(delta); + cfs.metric.liveDiskSpaceUsed.dec(delta); + cfs.metric.totalDiskSpaceUsed.dec(delta); + }); + } + @VisibleForTesting static Pair<List<SSTableReader>, List<ResampleEntry>> distributeRemainingSpace(List<ResampleEntry> toDownsample, long remainingSpace) { diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index c094e0b..0485275 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -1133,7 +1133,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS double effectiveInterval = indexSummary.getEffectiveIndexInterval(); IndexSummary newSummary; - long oldSize = bytesOnDisk(); // We have to rebuild the summary from the on-disk primary index in three cases: // 1. The sampling level went up, so we need to read more entries off disk @@ -1162,11 +1161,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS saveSummary(ibuilder, dbuilder, newSummary); } - // The new size will be added in Transactional.commit() as an updated SSTable, more details: CASSANDRA-13738 - StorageMetrics.load.dec(oldSize); - parent.metric.liveDiskSpaceUsed.dec(oldSize); - parent.metric.totalDiskSpaceUsed.dec(oldSize); - return cloneAndReplace(first, OpenReason.METADATA_CHANGE, newSummary); } } diff --git a/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java b/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java new file mode 100644 index 0000000..ddacc6b --- /dev/null +++ b/test/unit/org/apache/cassandra/io/DiskSpaceMetricsTest.java @@ -0,0 +1,139 @@ +package org.apache.cassandra.io; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.CompactionInterruptedException; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.io.sstable.IndexSummaryManager; +import org.apache.cassandra.io.sstable.IndexSummaryRedistribution; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.FBUtilities; + +public class DiskSpaceMetricsTest extends CQLTester +{ + /** + * This test runs the system with normal operations and makes sure the disk metrics match reality + */ + @Test + public void baseline() throws Throwable + { + createTable("CREATE TABLE %s (pk bigint, PRIMARY KEY (pk)) WITH min_index_interval=1"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + + // disable compaction so nothing changes between calculations + cfs.disableAutoCompaction(); + + // create 100 sstables + for (int i = 0; i < 100; i++) + insert(cfs, i); + assertDiskSpaceEqual(cfs); + } + + /** + * If index summary downsampling is interrupted in the middle, the metrics still reflect the real data + */ + @Test + public void summaryRedistribution() throws Throwable + { + createTable("CREATE TABLE %s (pk bigint, PRIMARY KEY (pk)) WITH min_index_interval=1"); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + + // disable compaction so nothing changes between calculations + cfs.disableAutoCompaction(); + + // create 100 sstables, make sure they have more than 1 value, else sampling can't happen + for (int i = 0; i < 100; i++) + insertN(cfs, 10, i); + assertDiskSpaceEqual(cfs); + + // summary downsample + for (int i = 0; i < 100; i++) + { + indexDownsampleCancelLastSSTable(cfs); + assertDiskSpaceEqual(cfs); + } + } + + private void insert(ColumnFamilyStore cfs, long value) throws Throwable + { + insertN(cfs, 1, value); + } + + private void insertN(ColumnFamilyStore cfs, int n, long base) throws Throwable + { + for (int i = 0; i < n; i++) + execute("INSERT INTO %s (pk) VALUES (?)", base + i); + + // flush to write the sstable + cfs.forceBlockingFlush(); + } + + private void assertDiskSpaceEqual(ColumnFamilyStore cfs) + { + long liveDiskSpaceUsed = cfs.metric.liveDiskSpaceUsed.getCount(); + long actual = 0; + for (SSTableReader sstable : cfs.getTracker().getView().liveSSTables()) + actual += sstable.bytesOnDisk(); + + Assert.assertEquals("bytes on disk does not match current metric liveDiskSpaceUsed", actual, liveDiskSpaceUsed); + + // totalDiskSpaceUsed is based off SStable delete, which is async: LogTransaction's tidy enqueues in ScheduledExecutors.nonPeriodicTasks + // wait for there to be no more pending sstable releases + LifecycleTransaction.waitForDeletions(); + long totalDiskSpaceUsed = cfs.metric.totalDiskSpaceUsed.getCount(); + Assert.assertEquals("bytes on disk does not match current metric totalDiskSpaceUsed", actual, totalDiskSpaceUsed); + } + + private static void indexDownsampleCancelLastSSTable(ColumnFamilyStore cfs) + { + List<SSTableReader> sstables = Lists.newArrayList(cfs.getSSTables(SSTableSet.CANONICAL)); + LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN); + Map<UUID, LifecycleTransaction> txns = ImmutableMap.of(cfs.metadata.cfId, txn); + // fail on the last file (* 3 because we call isStopRequested 3 times for each sstable, and we should fail on the last) + AtomicInteger countdown = new AtomicInteger(3 * sstables.size() - 1); + IndexSummaryRedistribution redistribution = new IndexSummaryRedistribution(Collections.emptyList(), txns, 0) { + public boolean isStopRequested() + { + return countdown.decrementAndGet() == 0; + } + }; + try + { + IndexSummaryManager.redistributeSummaries(redistribution); + Assert.fail("Should throw CompactionInterruptedException"); + } + catch (CompactionInterruptedException e) + { + // trying to get this to happen + } + catch (IOException e) + { + throw new RuntimeException(e); + } + finally + { + try + { + FBUtilities.closeAll(txns.values()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org