This is an automated email from the ASF dual-hosted git repository. slebresne pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit ecd23f1da5894511cccac6c8445f962f3b73f733 Merge: 2beebbb e2ecdf2 Author: Sylvain Lebresne <lebre...@gmail.com> AuthorDate: Mon Aug 17 11:32:46 2020 +0200 Merge commit 'e2ecdf268a82fa3ac0f4c9fe77ab35bca33cc72a' into cassandra-3.11 CHANGES.txt | 1 + .../cassandra/db/SinglePartitionReadCommand.java | 24 ---------------------- .../db/compaction/AbstractCompactionStrategy.java | 5 ----- .../db/compaction/CompactionStrategyManager.java | 7 ------- .../compaction/SizeTieredCompactionStrategy.java | 6 ------ 5 files changed, 1 insertion(+), 42 deletions(-) diff --cc CHANGES.txt index 5168acb,9b4f8c3..a6bc9d9 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,7 -1,5 +1,8 @@@ -3.0.22: +3.11.8 + * Fix short read protection for GROUP BY queries (CASSANDRA-15459) + * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857) +Merged from 3.0: + * Remove broken 'defrag-on-read' optimization (CASSANDRA-15432) * Check for endpoint collision with hibernating nodes (CASSANDRA-14599) * Operational improvements and hardening for replica filtering protection (CASSANDRA-15907) * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191) diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index a820a89,ca4e8e3..c5de444 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@@ -918,8 -882,7 +918,7 @@@ public class SinglePartitionReadComman } /* add the SSTables on disk */ - Collections.sort(view.sstables, SSTableReader.maxTimestampComparator); + Collections.sort(view.sstables, SSTableReader.maxTimestampDescending); - boolean onlyUnrepaired = true; // read sorted sstables SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector(); for (SSTableReader sstable : view.sstables) @@@ -1012,28 -967,7 +1008,8 @@@ DecoratedKey key = result.partitionKey(); cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1); + StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey()); - // "hoist up" the requested data into a more recent sstable - if (metricsCollector.getMergedSSTables() > cfs.getMinimumCompactionThreshold() - && onlyUnrepaired - && !cfs.isAutoCompactionDisabled() - && cfs.getCompactionStrategyManager().shouldDefragment()) - { - // !!WARNING!! if we stop copying our data to a heap-managed object, - // we will need to track the lifetime of this mutation as well - Tracing.trace("Defragmenting requested data"); - - try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false)) - { - final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter, columnFilter())); - StageManager.getStage(Stage.MUTATION).execute(() -> { - // skipping commitlog and index updates is fine since we're just de-fragmenting existing data - Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false); - }); - } - } - return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed()); } diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 86170a1,5d4a254..d486679 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@@ -102,11 -64,8 +102,10 @@@ public class CompactionStrategyManager If a user changes the local compaction strategy and then later ALTERs a compaction parameter, we will use the new compaction parameters. - */ - private CompactionParams schemaCompactionParams; + **/ + private volatile CompactionParams schemaCompactionParams; - private boolean shouldDefragment; + private boolean supportsEarlyOpen; + private int fanout; public CompactionStrategyManager(ColumnFamilyStore cfs) { @@@ -206,28 -129,13 +205,27 @@@ private void startup() { - for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) + writeLock.lock(); + try { - if (sstable.openReason != SSTableReader.OpenReason.EARLY) - getCompactionStrategyFor(sstable).addSSTable(sstable); + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) + { + if (sstable.openReason != SSTableReader.OpenReason.EARLY) + compactionStrategyFor(sstable).addSSTable(sstable); + } + repaired.forEach(AbstractCompactionStrategy::startup); + unrepaired.forEach(AbstractCompactionStrategy::startup); - shouldDefragment = repaired.get(0).shouldDefragment(); + supportsEarlyOpen = repaired.get(0).supportsEarlyOpen(); + fanout = (repaired.get(0) instanceof LeveledCompactionStrategy) ? ((LeveledCompactionStrategy) repaired.get(0)).getLevelFanoutSize() : LeveledCompactionStrategy.DEFAULT_LEVEL_FANOUT_SIZE; } - repaired.startup(); - unrepaired.startup(); + finally + { + writeLock.unlock(); + } + repaired.forEach(AbstractCompactionStrategy::startup); + unrepaired.forEach(AbstractCompactionStrategy::startup); + if (Stream.concat(repaired.stream(), unrepaired.stream()).anyMatch(cs -> cs.logAll)) + compactionLogger.enable(); } /** @@@ -472,124 -235,75 +470,119 @@@ return res; } - public boolean shouldDefragment() - { - return shouldDefragment; - } - public Directories getDirectories() { - assert repaired.getClass().equals(unrepaired.getClass()); - return repaired.getDirectories(); + maybeReloadDiskBoundaries(); + readLock.lock(); + try + { + assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass()); + return repaired.get(0).getDirectories(); + } + finally + { + readLock.unlock(); + } } - public synchronized void handleNotification(INotification notification, Object sender) + private void handleFlushNotification(Iterable<SSTableReader> added) { - if (notification instanceof SSTableAddedNotification) + // If reloaded, SSTables will be placed in their correct locations + // so there is no need to process notification + if (maybeReloadDiskBoundaries()) + return; + + readLock.lock(); + try { - SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification; - for (SSTableReader sstable : flushedNotification.added) - { - if (sstable.isRepaired()) - repaired.addSSTable(sstable); - else - unrepaired.addSSTable(sstable); - } + for (SSTableReader sstable : added) + compactionStrategyFor(sstable).addSSTable(sstable); } - else if (notification instanceof SSTableListChangedNotification) + finally { - SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; - Set<SSTableReader> repairedRemoved = new HashSet<>(); - Set<SSTableReader> repairedAdded = new HashSet<>(); - Set<SSTableReader> unrepairedRemoved = new HashSet<>(); - Set<SSTableReader> unrepairedAdded = new HashSet<>(); + readLock.unlock(); + } + } + + private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed) + { + // If reloaded, SSTables will be placed in their correct locations + // so there is no need to process notification + if (maybeReloadDiskBoundaries()) + return; + + readLock.lock(); + try + { + // a bit of gymnastics to be able to replace sstables in compaction strategies + // we use this to know that a compaction finished and where to start the next compaction in LCS + int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1; - for (SSTableReader sstable : listChangedNotification.removed) + List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize); + List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize); + List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize); + List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize); + + for (int i = 0; i < locationSize; i++) { - if (sstable.isRepaired()) - repairedRemoved.add(sstable); - else - unrepairedRemoved.add(sstable); + repairedRemoved.add(new HashSet<>()); + repairedAdded.add(new HashSet<>()); + unrepairedRemoved.add(new HashSet<>()); + unrepairedAdded.add(new HashSet<>()); } - for (SSTableReader sstable : listChangedNotification.added) + + for (SSTableReader sstable : removed) { + int i = compactionStrategyIndexFor(sstable); if (sstable.isRepaired()) - repairedAdded.add(sstable); + repairedRemoved.get(i).add(sstable); else - unrepairedAdded.add(sstable); + unrepairedRemoved.get(i).add(sstable); } - if (!repairedRemoved.isEmpty()) + for (SSTableReader sstable : added) { - repaired.replaceSSTables(repairedRemoved, repairedAdded); + int i = compactionStrategyIndexFor(sstable); + if (sstable.isRepaired()) + repairedAdded.get(i).add(sstable); + else + unrepairedAdded.get(i).add(sstable); } - else + for (int i = 0; i < locationSize; i++) { - for (SSTableReader sstable : repairedAdded) - repaired.addSSTable(sstable); - } + if (!repairedRemoved.get(i).isEmpty()) + repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i)); + else + repaired.get(i).addSSTables(repairedAdded.get(i)); - if (!unrepairedRemoved.isEmpty()) - { - unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded); - } - else - { - for (SSTableReader sstable : unrepairedAdded) - unrepaired.addSSTable(sstable); + if (!unrepairedRemoved.get(i).isEmpty()) + unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i)); + else + unrepaired.get(i).addSSTables(unrepairedAdded.get(i)); } } - else if (notification instanceof SSTableRepairStatusChanged) + finally { - for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable) + readLock.unlock(); + } + } + + private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables) + { + // If reloaded, SSTables will be placed in their correct locations + // so there is no need to process notification + if (maybeReloadDiskBoundaries()) + return; + // we need a write lock here since we move sstables from one strategy instance to another + readLock.lock(); + try + { + for (SSTableReader sstable : sstables) { + int index = compactionStrategyIndexFor(sstable); if (sstable.isRepaired()) { - unrepaired.removeSSTable(sstable); - repaired.addSSTable(sstable); + unrepaired.get(index).removeSSTable(sstable); + repaired.get(index).addSSTable(sstable); } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org