Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 4012134f6 -> 9f7ab09f7 refs/heads/trunk 54956e984 -> 9ba900dcf
Fix streaming not holding ref when stream error also fix crc validation error by not streaming early opened SSTable. patch by yukim; reviewed by benedict for CASSANDRA-9295 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f7ab09f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f7ab09f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f7ab09f Branch: refs/heads/cassandra-2.1 Commit: 9f7ab09f733659c94e918db03d72e6a860d654b4 Parents: 4012134 Author: Yuki Morishita <yu...@apache.org> Authored: Wed May 13 09:04:41 2015 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed May 13 09:07:25 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 25 ---------- .../cassandra/io/sstable/SSTableLoader.java | 2 +- .../cassandra/streaming/StreamSession.java | 42 +++++++++++++---- .../cassandra/streaming/StreamTransferTask.java | 12 ++--- .../streaming/messages/OutgoingFileMessage.java | 49 ++++++++++++++------ .../apache/cassandra/utils/concurrent/Refs.java | 2 +- .../cassandra/io/sstable/LegacySSTableTest.java | 2 +- .../streaming/StreamTransferTaskTest.java | 2 +- .../streaming/StreamingTransferTest.java | 2 +- 10 files changed, 78 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7cb0dfd..033d75d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -21,6 +21,7 @@ * Use higher timeout for prepair and snapshot in repair (CASSANDRA-9261) * Fix anticompaction blocking ANTI_ENTROPY stage (CASSANDRA-9151) * Repair waits for anticompaction to finish (CASSANDRA-9097) + * Fix streaming not holding ref when stream error (CASSANDRA-9295) Merged from 2.0: * Fix counting of tombstones for TombstoneOverwhelmingException (CASSANDRA-9299) * Fix ReconnectableSnitch reconnecting to peers during upgrade (CASSANDRA-6702) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 41ceb50..978037e 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1874,31 +1874,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean }; } - /** - * @return a ViewFragment containing the sstables and memtables that may need to be merged - * for rows for all of @param rowBoundsCollection, inclusive, according to the interval tree. - */ - public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired) - { - return new Function<DataTracker.View, List<SSTableReader>>() - { - public List<SSTableReader> apply(DataTracker.View view) - { - Set<SSTableReader> sstables = Sets.newHashSet(); - for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection) - { - for (SSTableReader sstable : view.sstablesInBounds(rowBounds)) - { - if (includeRepaired || !sstable.isRepaired()) - sstables.add(sstable); - } - } - - return ImmutableList.copyOf(sstables); - } - }; - } - public List<String> getSSTablesForKey(String key) { DecoratedKey dk = partitioner.decorateKey(metadata.getKeyValidator().fromString(key)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index b66f8dc..249c084 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -129,7 +129,7 @@ public class SSTableLoader implements StreamEventHandler Ref ref = sstable.tryRef(); if (ref == null) throw new IllegalStateException("Could not acquire ref for "+sstable); - StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(sstable, ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE); + StreamSession.SSTableStreamingSections details = new StreamSession.SSTableStreamingSections(ref, sstableSections, estimatedKeys, ActiveRepairService.UNREPAIRED_SSTABLE); streamingDetails.put(endpoint, details); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index a9f5075..a316d12 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -24,12 +24,16 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; + +import com.google.common.base.Function; import com.google.common.collect.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataTracker; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.dht.AbstractBounds; @@ -45,7 +49,6 @@ import org.apache.cassandra.streaming.messages.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.concurrent.RefCounted; import org.apache.cassandra.utils.concurrent.Ref; import org.apache.cassandra.utils.concurrent.Refs; @@ -291,17 +294,38 @@ public class StreamSession implements IEndpointStateChangeSubscriber return stores; } - private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, boolean isIncremental) + private List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, long overriddenRepairedAt, final boolean isIncremental) { Refs<SSTableReader> refs = new Refs<>(); try { for (ColumnFamilyStore cfStore : stores) { - List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size()); + final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size()); for (Range<Token> range : ranges) rowBoundsList.add(range.toRowBounds()); - refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, !isIncremental)).refs); + refs.addAll(cfStore.selectAndReference(new Function<DataTracker.View, List<SSTableReader>>() + { + public List<SSTableReader> apply(DataTracker.View view) + { + List<SSTableReader> filteredSSTables = ColumnFamilyStore.CANONICAL_SSTABLES.apply(view); + Set<SSTableReader> sstables = Sets.newHashSet(); + if (filteredSSTables != null) + { + for (AbstractBounds<RowPosition> rowBounds : rowBoundsList) + { + // sstableInBounds may contain early opened sstables + for (SSTableReader sstable : view.sstablesInBounds(rowBounds)) + { + if (filteredSSTables.contains(sstable) && (!isIncremental || !sstable.isRepaired())) + sstables.add(sstable); + } + } + } + + return ImmutableList.copyOf(sstables); + } + }).refs); } List<SSTableStreamingSections> sections = new ArrayList<>(refs.size()); @@ -310,7 +334,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber long repairedAt = overriddenRepairedAt; if (overriddenRepairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) repairedAt = sstable.getSSTableMetadata().repairedAt; - sections.add(new SSTableStreamingSections(sstable, refs.get(sstable), + sections.add(new SSTableStreamingSections(refs.get(sstable), sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges), repairedAt)); @@ -338,29 +362,27 @@ public class StreamSession implements IEndpointStateChangeSubscriber continue; } - UUID cfId = details.sstable.metadata.cfId; + UUID cfId = details.ref.get().metadata.cfId; StreamTransferTask task = transfers.get(cfId); if (task == null) { task = new StreamTransferTask(this, cfId); transfers.put(cfId, task); } - task.addTransferFile(details.sstable, details.ref, details.estimatedKeys, details.sections, details.repairedAt); + task.addTransferFile(details.ref, details.estimatedKeys, details.sections, details.repairedAt); iter.remove(); } } public static class SSTableStreamingSections { - public final SSTableReader sstable; public final Ref<SSTableReader> ref; public final List<Pair<Long, Long>> sections; public final long estimatedKeys; public final long repairedAt; - public SSTableStreamingSections(SSTableReader sstable, Ref ref, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt) + public SSTableStreamingSections(Ref<SSTableReader> ref, List<Pair<Long, Long>> sections, long estimatedKeys, long repairedAt) { - this.sstable = sstable; this.ref = ref; this.sections = sections; this.estimatedKeys = estimatedKeys; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/streaming/StreamTransferTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java index 3add478..1727bae 100644 --- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java @@ -52,10 +52,10 @@ public class StreamTransferTask extends StreamTask super(session, cfId); } - public synchronized void addTransferFile(SSTableReader sstable, Ref ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) + public synchronized void addTransferFile(Ref<SSTableReader> ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) { - assert sstable != null && cfId.equals(sstable.metadata.cfId); - OutgoingFileMessage message = new OutgoingFileMessage(sstable, ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt); + assert ref.get() != null && cfId.equals(ref.get().metadata.cfId); + OutgoingFileMessage message = new OutgoingFileMessage(ref, sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt); files.put(message.header.sequenceNumber, message); totalSize += message.header.size(); } @@ -76,7 +76,7 @@ public class StreamTransferTask extends StreamTask OutgoingFileMessage file = files.remove(sequenceNumber); if (file != null) - file.ref.release(); + file.complete(); signalComplete = files.isEmpty(); } @@ -101,7 +101,7 @@ public class StreamTransferTask extends StreamTask { try { - file.ref.release(); + file.complete(); } catch (Throwable t) { @@ -127,7 +127,7 @@ public class StreamTransferTask extends StreamTask public synchronized Collection<OutgoingFileMessage> getFileMessages() { // We may race between queuing all those messages and the completion of the completion of - // the first ones. So copy tthe values to avoid a ConcurrentModificationException + // the first ones. So copy the values to avoid a ConcurrentModificationException return new ArrayList<>(files.values()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java index 069e97f..082e306 100644 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.DataOutputStreamAndChannel; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamWriter; @@ -47,29 +46,23 @@ public class OutgoingFileMessage extends StreamMessage public void serialize(OutgoingFileMessage message, DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException { - FileMessageHeader.serializer.serialize(message.header, out, version); - - final SSTableReader reader = message.sstable; - StreamWriter writer = message.header.compressionInfo == null ? - new StreamWriter(reader, message.header.sections, session) : - new CompressedStreamWriter(reader, - message.header.sections, - message.header.compressionInfo, session); - writer.write(out.getChannel()); + message.serialize(out, version, session); session.fileSent(message.header); } }; public final FileMessageHeader header; - public final SSTableReader sstable; - public final Ref<SSTableReader> ref; + private final Ref<SSTableReader> ref; + private final String filename; + private boolean completed = false; - public OutgoingFileMessage(SSTableReader sstable, Ref ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) + public OutgoingFileMessage(Ref<SSTableReader> ref, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) { super(Type.FILE); - this.sstable = sstable; this.ref = ref; + SSTableReader sstable = ref.get(); + filename = sstable.getFilename(); CompressionInfo compressionInfo = null; if (sstable.compression) { @@ -85,10 +78,36 @@ public class OutgoingFileMessage extends StreamMessage repairedAt); } + public synchronized void serialize(DataOutputStreamAndChannel out, int version, StreamSession session) throws IOException + { + if (completed) + { + return; + } + + FileMessageHeader.serializer.serialize(header, out, version); + + final SSTableReader reader = ref.get(); + StreamWriter writer = header.compressionInfo == null ? + new StreamWriter(reader, header.sections, session) : + new CompressedStreamWriter(reader, header.sections, + header.compressionInfo, session); + writer.write(out.getChannel()); + } + + public synchronized void complete() + { + if (!completed) + { + completed = true; + ref.release(); + } + } + @Override public String toString() { - return "File (" + header + ", file: " + sstable.getFilename() + ")"; + return "File (" + header + ", file: " + filename + ")"; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/src/java/org/apache/cassandra/utils/concurrent/Refs.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/Refs.java b/src/java/org/apache/cassandra/utils/concurrent/Refs.java index b24fc2f..1c6486e 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Refs.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Refs.java @@ -59,7 +59,7 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i * @param referenced the object we have a Ref to * @return the Ref to said object */ - public Ref get(T referenced) + public Ref<T> get(T referenced) { return references.get(referenced); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index 19ba274..51d695f 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -118,7 +118,7 @@ public class LegacySSTableTest extends SchemaLoader ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100")))); ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken())); ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>(); - details.add(new StreamSession.SSTableStreamingSections(sstable, sstable.ref(), + details.add(new StreamSession.SSTableStreamingSections(sstable.ref(), sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt)); new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 1447b29..306afc0 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -64,7 +64,7 @@ public class StreamTransferTaskTest extends SchemaLoader { List<Range<Token>> ranges = new ArrayList<>(); ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); - task.addTransferFile(sstable, sstable.selfRef(), 1, sstable.getPositionsForRanges(ranges), 0); + task.addTransferFile(sstable.selfRef(), 1, sstable.getPositionsForRanges(ranges), 0); } assertEquals(2, task.getTotalNumberOfFiles()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f7ab09f/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index a5112b7..06ebdd3 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -212,7 +212,7 @@ public class StreamingTransferTest extends SchemaLoader ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>(); for (SSTableReader sstable : sstables) { - details.add(new StreamSession.SSTableStreamingSections(sstable, sstables.get(sstable), + details.add(new StreamSession.SSTableStreamingSections(sstables.get(sstable), sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt)); }