This is an automated email from the ASF dual-hosted git repository.

slebresne pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit ecd23f1da5894511cccac6c8445f962f3b73f733
Merge: 2beebbb e2ecdf2
Author: Sylvain Lebresne <lebre...@gmail.com>
AuthorDate: Mon Aug 17 11:32:46 2020 +0200

    Merge commit 'e2ecdf268a82fa3ac0f4c9fe77ab35bca33cc72a' into cassandra-3.11

 CHANGES.txt                                        |  1 +
 .../cassandra/db/SinglePartitionReadCommand.java   | 24 ----------------------
 .../db/compaction/AbstractCompactionStrategy.java  |  5 -----
 .../db/compaction/CompactionStrategyManager.java   |  7 -------
 .../compaction/SizeTieredCompactionStrategy.java   |  6 ------
 5 files changed, 1 insertion(+), 42 deletions(-)

diff --cc CHANGES.txt
index 5168acb,9b4f8c3..a6bc9d9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -3.0.22:
 +3.11.8
 + * Fix short read protection for GROUP BY queries (CASSANDRA-15459)
 + * Frozen RawTuple is not annotated with frozen in the toString method 
(CASSANDRA-15857)
 +Merged from 3.0:
+  * Remove broken 'defrag-on-read' optimization (CASSANDRA-15432)
   * Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
   * Operational improvements and hardening for replica filtering protection 
(CASSANDRA-15907)
   * stop_paranoid disk failure policy is ignored on CorruptSSTableException 
after node is up (CASSANDRA-15191)
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index a820a89,ca4e8e3..c5de444
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -918,8 -882,7 +918,7 @@@ public class SinglePartitionReadComman
          }
  
          /* add the SSTables on disk */
 -        Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
 +        Collections.sort(view.sstables, SSTableReader.maxTimestampDescending);
-         boolean onlyUnrepaired = true;
          // read sorted sstables
          SSTableReadMetricsCollector metricsCollector = new 
SSTableReadMetricsCollector();
          for (SSTableReader sstable : view.sstables)
@@@ -1012,28 -967,7 +1008,8 @@@
  
          DecoratedKey key = result.partitionKey();
          
cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), 
key.hashCode(), 1);
 +        StorageHook.instance.reportRead(cfs.metadata.cfId, partitionKey());
  
-         // "hoist up" the requested data into a more recent sstable
-         if (metricsCollector.getMergedSSTables() > 
cfs.getMinimumCompactionThreshold()
-             && onlyUnrepaired
-             && !cfs.isAutoCompactionDisabled()
-             && cfs.getCompactionStrategyManager().shouldDefragment())
-         {
-             // !!WARNING!!   if we stop copying our data to a heap-managed 
object,
-             //               we will need to track the lifetime of this 
mutation as well
-             Tracing.trace("Defragmenting requested data");
- 
-             try (UnfilteredRowIterator iter = 
result.unfilteredIterator(columnFilter(), Slices.ALL, false))
-             {
-                 final Mutation mutation = new 
Mutation(PartitionUpdate.fromIterator(iter, columnFilter()));
-                 StageManager.getStage(Stage.MUTATION).execute(() -> {
-                     // skipping commitlog and index updates is fine since 
we're just de-fragmenting existing data
-                     Keyspace.open(mutation.getKeyspaceName()).apply(mutation, 
false, false);
-                 });
-             }
-         }
- 
          return result.unfilteredIterator(columnFilter(), Slices.ALL, 
clusteringIndexFilter().isReversed());
      }
  
diff --cc 
src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 86170a1,5d4a254..d486679
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -102,11 -64,8 +102,10 @@@ public class CompactionStrategyManager 
  
          If a user changes the local compaction strategy and then later ALTERs 
a compaction parameter,
          we will use the new compaction parameters.
 -     */
 -    private CompactionParams schemaCompactionParams;
 +     **/
 +    private volatile CompactionParams schemaCompactionParams;
-     private boolean shouldDefragment;
 +    private boolean supportsEarlyOpen;
 +    private int fanout;
  
      public CompactionStrategyManager(ColumnFamilyStore cfs)
      {
@@@ -206,28 -129,13 +205,27 @@@
  
      private void startup()
      {
 -        for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
 +        writeLock.lock();
 +        try
          {
 -            if (sstable.openReason != SSTableReader.OpenReason.EARLY)
 -                getCompactionStrategyFor(sstable).addSSTable(sstable);
 +            for (SSTableReader sstable : 
cfs.getSSTables(SSTableSet.CANONICAL))
 +            {
 +                if (sstable.openReason != SSTableReader.OpenReason.EARLY)
 +                    compactionStrategyFor(sstable).addSSTable(sstable);
 +            }
 +            repaired.forEach(AbstractCompactionStrategy::startup);
 +            unrepaired.forEach(AbstractCompactionStrategy::startup);
-             shouldDefragment = repaired.get(0).shouldDefragment();
 +            supportsEarlyOpen = repaired.get(0).supportsEarlyOpen();
 +            fanout = (repaired.get(0) instanceof LeveledCompactionStrategy) ? 
((LeveledCompactionStrategy) repaired.get(0)).getLevelFanoutSize() : 
LeveledCompactionStrategy.DEFAULT_LEVEL_FANOUT_SIZE;
          }
 -        repaired.startup();
 -        unrepaired.startup();
 +        finally
 +        {
 +            writeLock.unlock();
 +        }
 +        repaired.forEach(AbstractCompactionStrategy::startup);
 +        unrepaired.forEach(AbstractCompactionStrategy::startup);
 +        if (Stream.concat(repaired.stream(), unrepaired.stream()).anyMatch(cs 
-> cs.logAll))
 +            compactionLogger.enable();
      }
  
      /**
@@@ -472,124 -235,75 +470,119 @@@
          return res;
      }
  
-     public boolean shouldDefragment()
-     {
-         return shouldDefragment;
-     }
- 
      public Directories getDirectories()
      {
 -        assert repaired.getClass().equals(unrepaired.getClass());
 -        return repaired.getDirectories();
 +        maybeReloadDiskBoundaries();
 +        readLock.lock();
 +        try
 +        {
 +            assert 
repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
 +            return repaired.get(0).getDirectories();
 +        }
 +        finally
 +        {
 +            readLock.unlock();
 +        }
      }
  
 -    public synchronized void handleNotification(INotification notification, 
Object sender)
 +    private void handleFlushNotification(Iterable<SSTableReader> added)
      {
 -        if (notification instanceof SSTableAddedNotification)
 +        // If reloaded, SSTables will be placed in their correct locations
 +        // so there is no need to process notification
 +        if (maybeReloadDiskBoundaries())
 +            return;
 +
 +        readLock.lock();
 +        try
          {
 -            SSTableAddedNotification flushedNotification = 
(SSTableAddedNotification) notification;
 -            for (SSTableReader sstable : flushedNotification.added)
 -            {
 -                if (sstable.isRepaired())
 -                    repaired.addSSTable(sstable);
 -                else
 -                    unrepaired.addSSTable(sstable);
 -            }
 +            for (SSTableReader sstable : added)
 +                compactionStrategyFor(sstable).addSSTable(sstable);
          }
 -        else if (notification instanceof SSTableListChangedNotification)
 +        finally
          {
 -            SSTableListChangedNotification listChangedNotification = 
(SSTableListChangedNotification) notification;
 -            Set<SSTableReader> repairedRemoved = new HashSet<>();
 -            Set<SSTableReader> repairedAdded = new HashSet<>();
 -            Set<SSTableReader> unrepairedRemoved = new HashSet<>();
 -            Set<SSTableReader> unrepairedAdded = new HashSet<>();
 +            readLock.unlock();
 +        }
 +    }
 +
 +    private void handleListChangedNotification(Iterable<SSTableReader> added, 
Iterable<SSTableReader> removed)
 +    {
 +        // If reloaded, SSTables will be placed in their correct locations
 +        // so there is no need to process notification
 +        if (maybeReloadDiskBoundaries())
 +            return;
 +
 +        readLock.lock();
 +        try
 +        {
 +            // a bit of gymnastics to be able to replace sstables in 
compaction strategies
 +            // we use this to know that a compaction finished and where to 
start the next compaction in LCS
 +            int locationSize = partitionSSTablesByTokenRange? 
currentBoundaries.directories.size() : 1;
  
 -            for (SSTableReader sstable : listChangedNotification.removed)
 +            List<Set<SSTableReader>> repairedRemoved = new 
ArrayList<>(locationSize);
 +            List<Set<SSTableReader>> repairedAdded = new 
ArrayList<>(locationSize);
 +            List<Set<SSTableReader>> unrepairedRemoved = new 
ArrayList<>(locationSize);
 +            List<Set<SSTableReader>> unrepairedAdded = new 
ArrayList<>(locationSize);
 +
 +            for (int i = 0; i < locationSize; i++)
              {
 -                if (sstable.isRepaired())
 -                    repairedRemoved.add(sstable);
 -                else
 -                    unrepairedRemoved.add(sstable);
 +                repairedRemoved.add(new HashSet<>());
 +                repairedAdded.add(new HashSet<>());
 +                unrepairedRemoved.add(new HashSet<>());
 +                unrepairedAdded.add(new HashSet<>());
              }
 -            for (SSTableReader sstable : listChangedNotification.added)
 +
 +            for (SSTableReader sstable : removed)
              {
 +                int i = compactionStrategyIndexFor(sstable);
                  if (sstable.isRepaired())
 -                    repairedAdded.add(sstable);
 +                    repairedRemoved.get(i).add(sstable);
                  else
 -                    unrepairedAdded.add(sstable);
 +                    unrepairedRemoved.get(i).add(sstable);
              }
 -            if (!repairedRemoved.isEmpty())
 +            for (SSTableReader sstable : added)
              {
 -                repaired.replaceSSTables(repairedRemoved, repairedAdded);
 +                int i = compactionStrategyIndexFor(sstable);
 +                if (sstable.isRepaired())
 +                    repairedAdded.get(i).add(sstable);
 +                else
 +                    unrepairedAdded.get(i).add(sstable);
              }
 -            else
 +            for (int i = 0; i < locationSize; i++)
              {
 -                for (SSTableReader sstable : repairedAdded)
 -                    repaired.addSSTable(sstable);
 -            }
 +                if (!repairedRemoved.get(i).isEmpty())
 +                    repaired.get(i).replaceSSTables(repairedRemoved.get(i), 
repairedAdded.get(i));
 +                else
 +                    repaired.get(i).addSSTables(repairedAdded.get(i));
  
 -            if (!unrepairedRemoved.isEmpty())
 -            {
 -                unrepaired.replaceSSTables(unrepairedRemoved, 
unrepairedAdded);
 -            }
 -            else
 -            {
 -                for (SSTableReader sstable : unrepairedAdded)
 -                    unrepaired.addSSTable(sstable);
 +                if (!unrepairedRemoved.get(i).isEmpty())
 +                    
unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), 
unrepairedAdded.get(i));
 +                else
 +                    unrepaired.get(i).addSSTables(unrepairedAdded.get(i));
              }
          }
 -        else if (notification instanceof SSTableRepairStatusChanged)
 +        finally
          {
 -            for (SSTableReader sstable : ((SSTableRepairStatusChanged) 
notification).sstable)
 +            readLock.unlock();
 +        }
 +    }
 +
 +    private void 
handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables)
 +    {
 +        // If reloaded, SSTables will be placed in their correct locations
 +        // so there is no need to process notification
 +        if (maybeReloadDiskBoundaries())
 +            return;
 +        // we need a write lock here since we move sstables from one strategy 
instance to another
 +        readLock.lock();
 +        try
 +        {
 +            for (SSTableReader sstable : sstables)
              {
 +                int index = compactionStrategyIndexFor(sstable);
                  if (sstable.isRepaired())
                  {
 -                    unrepaired.removeSSTable(sstable);
 -                    repaired.addSSTable(sstable);
 +                    unrepaired.get(index).removeSSTable(sstable);
 +                    repaired.get(index).addSSTable(sstable);
                  }
                  else
                  {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to