merge from 2.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f6446569
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f6446569
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f6446569

Branch: refs/heads/trunk
Commit: f64465693124bb198ebf8499136c610da77412ad
Parents: ace0e6e bfd03f1
Author: Jonathan Ellis <jbel...@apache.org>
Authored: Fri Mar 14 13:16:31 2014 -0500
Committer: Jonathan Ellis <jbel...@apache.org>
Committed: Fri Mar 14 13:16:31 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/config/Schema.java     |   7 +-
 .../db/commitlog/CommitLogSegment.java          |  13 +-
 .../apache/cassandra/utils/ConcurrentBiMap.java | 131 +++++++++++++++++++
 4 files changed, 143 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6446569/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 24c8090,6483012..12dccec
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,29 -1,6 +1,30 @@@
 -2.0.7
 +2.1.0-beta2
 + * Eliminate possibility of CL segment appearing twice in active list 
 +   (CASSANDRA-6557)
 + * Apply DONTNEED fadvise to commitlog segments (CASSANDRA-6759)
 + * Switch CRC component to Adler and include it for compressed sstables 
 +   (CASSANDRA-4165)
 + * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451)
 + * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899)
 + * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897)
 + * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573)
 + * Fix ABTC NPE and apply update function correctly (CASSANDRA-6692)
 + * Allow nodetool to use a file or prompt for password (CASSANDRA-6660)
 + * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742)
 + * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705)
 + * Scrub should not always clear out repaired status (CASSANDRA-5351)
 + * Improve handling of range tombstone for wide partitions (CASSANDRA-6446)
 + * Fix ClassCastException for compact table with composites (CASSANDRA-6738)
 + * Fix potentially repairing with wrong nodes (CASSANDRA-6808)
 + * Change caching option syntax (CASSANDRA-6745)
 + * Fix stress to do proper counter reads (CASSANDRA-6835)
 + * Fix help message for stress counter_write (CASSANDRA-6824)
 + * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848)
 + * Add logging levels (minimal, normal or verbose) to stress tool 
(CASSANDRA-6849)
 +Merged from 2.0:
+  * Fix schema concurrency exceptions (CASSANDRA-6841)
   * Fix leaking validator FH in StreamWriter (CASSANDRA-6832)
 + * fix nodetool getsstables for blob PK (CASSANDRA-6803)
   * Fix saving triggers to schema (CASSANDRA-6789)
   * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790)
   * Fix accounting in FileCacheService to allow re-using RAR (CASSANDRA-6838)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6446569/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Schema.java
index 01be68a,0907177..69914bb
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@@ -28,8 -31,10 +30,9 @@@ import org.slf4j.LoggerFactory
  
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.Keyspace;
 -import org.apache.cassandra.db.marshal.AbstractType;
  import org.apache.cassandra.io.sstable.Descriptor;
  import org.apache.cassandra.service.MigrationManager;
+ import org.apache.cassandra.utils.ConcurrentBiMap;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.Pair;
  import org.cliffc.high_scale_lib.NonBlockingHashMap;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6446569/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 9caba6d,5b8bcfa..9660b8d
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -17,8 -17,9 +17,7 @@@
   */
  package org.apache.cassandra.db.commitlog;
  
 -import java.io.DataOutputStream;
  import java.io.File;
- import java.io.FileDescriptor;
 -import java.io.FileOutputStream;
  import java.io.IOException;
  import java.io.RandomAccessFile;
  import java.nio.ByteBuffer;
@@@ -27,18 -28,13 +26,16 @@@ import java.nio.channels.FileChannel
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.Comparator;
+ import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
  import java.util.Map;
  import java.util.UUID;
  import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentMap;
  import java.util.concurrent.atomic.AtomicInteger;
 -import java.util.zip.Checksum;
  
- import org.apache.cassandra.utils.CLibrary;
- import org.apache.cassandra.utils.concurrent.OpOrder;
- 
 +import org.cliffc.high_scale_lib.NonBlockingHashMap;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -46,14 -42,16 +43,16 @@@ import org.apache.cassandra.config.CFMe
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.ColumnFamily;
 -import org.apache.cassandra.db.RowMutation;
 +import org.apache.cassandra.db.Mutation;
  import org.apache.cassandra.io.FSWriteError;
 -import org.apache.cassandra.io.util.ByteBufferOutputStream;
 -import org.apache.cassandra.io.util.ChecksummedOutputStream;
  import org.apache.cassandra.io.util.FileUtils;
 -import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.utils.CLibrary;
  import org.apache.cassandra.utils.PureJavaCrc32;
++import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.WaitQueue;
  
  /*
 - * A single commit log file on disk. Manages creation of the file and writing 
row mutations to disk,
 + * A single commit log file on disk. Manages creation of the file and writing 
mutations to disk,
   * as well as tracking the last mutation position of any "dirty" CFs covered 
by the segment file. Segment
   * files are initially allocated to a fixed size and can grow to accomidate a 
larger value if necessary.
   */
@@@ -427,94 -358,30 +426,94 @@@ public class CommitLogSegmen
       * @param cfId    the column family ID that is now clean
       * @param context the optional clean offset
       */
-     public void markClean(UUID cfId, ReplayPosition context)
+     public synchronized void markClean(UUID cfId, ReplayPosition context)
      {
 -        Integer lastWritten = cfLastWrite.get(cfId);
 +        if (!cfDirty.containsKey(cfId))
 +            return;
 +        if (context.segment == id)
 +            markClean(cfId, context.position);
 +        else if (context.segment > id)
 +            markClean(cfId, Integer.MAX_VALUE);
 +    }
 +
 +    private void markClean(UUID cfId, int position)
 +    {
 +        ensureAtleast(cfClean, cfId, position);
 +        removeCleanFromDirty();
 +    }
 +
 +    private static void ensureAtleast(ConcurrentMap<UUID, AtomicInteger> map, 
UUID cfId, int value)
 +    {
 +        AtomicInteger i = map.get(cfId);
 +        if (i == null)
 +        {
 +            AtomicInteger i2 = map.putIfAbsent(cfId, i = new AtomicInteger());
 +            if (i2 != null)
 +                i = i2;
 +        }
 +        while (true)
 +        {
 +            int cur = i.get();
 +            if (cur > value)
 +                break;
 +            if (i.compareAndSet(cur, value))
 +                break;
 +        }
 +    }
 +
 +    private void removeCleanFromDirty()
 +    {
 +        // if we're still allocating from this segment, don't touch anything 
since it can't be done thread-safely
 +        if (isStillAllocating())
 +            return;
  
 -        if (lastWritten != null && (!contains(context) || lastWritten < 
context.position))
 +        Iterator<Map.Entry<UUID, AtomicInteger>> iter = 
cfClean.entrySet().iterator();
 +        while (iter.hasNext())
          {
 -            cfLastWrite.remove(cfId);
 +            Map.Entry<UUID, AtomicInteger> clean = iter.next();
 +            UUID cfId = clean.getKey();
 +            AtomicInteger cleanPos = clean.getValue();
 +            AtomicInteger dirtyPos = cfDirty.get(cfId);
 +            if (dirtyPos != null && dirtyPos.intValue() < cleanPos.intValue())
 +            {
 +                cfDirty.remove(cfId);
 +                iter.remove();
 +            }
          }
      }
  
      /**
       * @return a collection of dirty CFIDs for this segment file.
       */
-     public Collection<UUID> getDirtyCFIDs()
+     public synchronized Collection<UUID> getDirtyCFIDs()
      {
 -        return new ArrayList<>(cfLastWrite.keySet());
 +        removeCleanFromDirty();
 +        if (cfClean.isEmpty() || cfDirty.isEmpty())
 +            return cfDirty.keySet();
 +        List<UUID> r = new ArrayList<>(cfDirty.size());
 +        for (Map.Entry<UUID, AtomicInteger> dirty : cfDirty.entrySet())
 +        {
 +            UUID cfId = dirty.getKey();
 +            AtomicInteger dirtyPos = dirty.getValue();
 +            AtomicInteger cleanPos = cfClean.get(cfId);
 +            if (cleanPos == null || cleanPos.intValue() < dirtyPos.intValue())
 +                r.add(dirty.getKey());
 +        }
 +        return r;
      }
  
      /**
       * @return true if this segment is unused and safe to recycle or delete
       */
-     public boolean isUnused()
+     public synchronized boolean isUnused()
      {
 -        return cfLastWrite.isEmpty();
 +        // if room to allocate, we're still in use as the active 
allocatingFrom,
 +        // so we don't want to race with updates to cfClean with 
removeCleanFromDirty
 +        if (isStillAllocating())
 +            return false;
 +
 +        removeCleanFromDirty();
 +        return cfDirty.isEmpty();
      }
  
      /**

Reply via email to