merge from 2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f6446569 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f6446569 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f6446569 Branch: refs/heads/trunk Commit: f64465693124bb198ebf8499136c610da77412ad Parents: ace0e6e bfd03f1 Author: Jonathan Ellis <jbel...@apache.org> Authored: Fri Mar 14 13:16:31 2014 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Fri Mar 14 13:16:31 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/Schema.java | 7 +- .../db/commitlog/CommitLogSegment.java | 13 +- .../apache/cassandra/utils/ConcurrentBiMap.java | 131 +++++++++++++++++++ 4 files changed, 143 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6446569/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 24c8090,6483012..12dccec --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,29 -1,6 +1,30 @@@ -2.0.7 +2.1.0-beta2 + * Eliminate possibility of CL segment appearing twice in active list + (CASSANDRA-6557) + * Apply DONTNEED fadvise to commitlog segments (CASSANDRA-6759) + * Switch CRC component to Adler and include it for compressed sstables + (CASSANDRA-4165) + * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451) + * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899) + * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897) + * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573) + * Fix ABTC NPE and apply update function correctly (CASSANDRA-6692) + * Allow nodetool to use a file or prompt for password (CASSANDRA-6660) + * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742) + * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705) + * Scrub should not always clear out repaired status (CASSANDRA-5351) + * Improve handling of range tombstone for wide partitions (CASSANDRA-6446) + * Fix ClassCastException for compact table with composites (CASSANDRA-6738) + * Fix potentially repairing with wrong nodes (CASSANDRA-6808) + * Change caching option syntax (CASSANDRA-6745) + * Fix stress to do proper counter reads (CASSANDRA-6835) + * Fix help message for stress counter_write (CASSANDRA-6824) + * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848) + * Add logging levels (minimal, normal or verbose) to stress tool (CASSANDRA-6849) +Merged from 2.0: + * Fix schema concurrency exceptions (CASSANDRA-6841) * Fix leaking validator FH in StreamWriter (CASSANDRA-6832) + * fix nodetool getsstables for blob PK (CASSANDRA-6803) * Fix saving triggers to schema (CASSANDRA-6789) * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790) * Fix accounting in FileCacheService to allow re-using RAR (CASSANDRA-6838) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6446569/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/Schema.java index 01be68a,0907177..69914bb --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@@ -28,8 -31,10 +30,9 @@@ import org.slf4j.LoggerFactory import org.apache.cassandra.db.*; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.service.MigrationManager; + import org.apache.cassandra.utils.ConcurrentBiMap; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.cliffc.high_scale_lib.NonBlockingHashMap; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6446569/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 9caba6d,5b8bcfa..9660b8d --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@@ -17,8 -17,9 +17,7 @@@ */ package org.apache.cassandra.db.commitlog; -import java.io.DataOutputStream; import java.io.File; - import java.io.FileDescriptor; -import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; @@@ -27,18 -28,13 +26,16 @@@ import java.nio.channels.FileChannel import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; + import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.zip.Checksum; - import org.apache.cassandra.utils.CLibrary; - import org.apache.cassandra.utils.concurrent.OpOrder; - +import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -46,14 -42,16 +43,16 @@@ import org.apache.cassandra.config.CFMe import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.db.Mutation; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.util.ByteBufferOutputStream; -import org.apache.cassandra.io.util.ChecksummedOutputStream; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.net.MessagingService; ++import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.PureJavaCrc32; ++import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.concurrent.WaitQueue; /* - * A single commit log file on disk. Manages creation of the file and writing row mutations to disk, + * A single commit log file on disk. Manages creation of the file and writing mutations to disk, * as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment * files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary. */ @@@ -427,94 -358,30 +426,94 @@@ public class CommitLogSegmen * @param cfId the column family ID that is now clean * @param context the optional clean offset */ - public void markClean(UUID cfId, ReplayPosition context) + public synchronized void markClean(UUID cfId, ReplayPosition context) { - Integer lastWritten = cfLastWrite.get(cfId); + if (!cfDirty.containsKey(cfId)) + return; + if (context.segment == id) + markClean(cfId, context.position); + else if (context.segment > id) + markClean(cfId, Integer.MAX_VALUE); + } + + private void markClean(UUID cfId, int position) + { + ensureAtleast(cfClean, cfId, position); + removeCleanFromDirty(); + } + + private static void ensureAtleast(ConcurrentMap<UUID, AtomicInteger> map, UUID cfId, int value) + { + AtomicInteger i = map.get(cfId); + if (i == null) + { + AtomicInteger i2 = map.putIfAbsent(cfId, i = new AtomicInteger()); + if (i2 != null) + i = i2; + } + while (true) + { + int cur = i.get(); + if (cur > value) + break; + if (i.compareAndSet(cur, value)) + break; + } + } + + private void removeCleanFromDirty() + { + // if we're still allocating from this segment, don't touch anything since it can't be done thread-safely + if (isStillAllocating()) + return; - if (lastWritten != null && (!contains(context) || lastWritten < context.position)) + Iterator<Map.Entry<UUID, AtomicInteger>> iter = cfClean.entrySet().iterator(); + while (iter.hasNext()) { - cfLastWrite.remove(cfId); + Map.Entry<UUID, AtomicInteger> clean = iter.next(); + UUID cfId = clean.getKey(); + AtomicInteger cleanPos = clean.getValue(); + AtomicInteger dirtyPos = cfDirty.get(cfId); + if (dirtyPos != null && dirtyPos.intValue() < cleanPos.intValue()) + { + cfDirty.remove(cfId); + iter.remove(); + } } } /** * @return a collection of dirty CFIDs for this segment file. */ - public Collection<UUID> getDirtyCFIDs() + public synchronized Collection<UUID> getDirtyCFIDs() { - return new ArrayList<>(cfLastWrite.keySet()); + removeCleanFromDirty(); + if (cfClean.isEmpty() || cfDirty.isEmpty()) + return cfDirty.keySet(); + List<UUID> r = new ArrayList<>(cfDirty.size()); + for (Map.Entry<UUID, AtomicInteger> dirty : cfDirty.entrySet()) + { + UUID cfId = dirty.getKey(); + AtomicInteger dirtyPos = dirty.getValue(); + AtomicInteger cleanPos = cfClean.get(cfId); + if (cleanPos == null || cleanPos.intValue() < dirtyPos.intValue()) + r.add(dirty.getKey()); + } + return r; } /** * @return true if this segment is unused and safe to recycle or delete */ - public boolean isUnused() + public synchronized boolean isUnused() { - return cfLastWrite.isEmpty(); + // if room to allocate, we're still in use as the active allocatingFrom, + // so we don't want to race with updates to cfClean with removeCleanFromDirty + if (isStillAllocating()) + return false; + + removeCleanFromDirty(); + return cfDirty.isEmpty(); } /**