http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java deleted file mode 100644 index 13a3358..0000000 --- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.streaming.messages; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -import org.apache.cassandra.db.SerializationHeader; -import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.io.sstable.format.SSTableFormat; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.CompactEndpointSerializationHelper; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.streaming.StreamSession; -import org.apache.cassandra.streaming.compress.CompressionInfo; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.UUIDSerializer; - -/** - * StreamingFileHeader is appended before sending actual data to describe what it's sending. - */ -public class FileMessageHeader -{ - public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer(); - - public final TableId tableId; - public UUID planId; - public int sessionIndex; - public final int sequenceNumber; - /** SSTable version */ - public final Version version; - - /** SSTable format **/ - public final SSTableFormat.Type format; - public final long estimatedKeys; - public final List<Pair<Long, Long>> sections; - /** - * Compression info for SSTable to send. Can be null if SSTable is not compressed. - * On sender, this field is always null to avoid holding large number of Chunks. - * Use compressionMetadata instead. - */ - public final CompressionInfo compressionInfo; - private final CompressionMetadata compressionMetadata; - public final long repairedAt; - public final UUID pendingRepair; - public final int sstableLevel; - public final SerializationHeader.Component header; - public final InetAddressAndPort sender; - - /* cached size value */ - private transient final long size; - - private FileMessageHeader(TableId tableId, - InetAddressAndPort sender, - UUID planId, - int sessionIndex, - int sequenceNumber, - Version version, - SSTableFormat.Type format, - long estimatedKeys, - List<Pair<Long, Long>> sections, - CompressionInfo compressionInfo, - long repairedAt, - UUID pendingRepair, - int sstableLevel, - SerializationHeader.Component header) - { - this.tableId = tableId; - this.sender = sender; - this.planId = planId; - this.sessionIndex = sessionIndex; - this.sequenceNumber = sequenceNumber; - this.version = version; - this.format = format; - this.estimatedKeys = estimatedKeys; - this.sections = sections; - this.compressionInfo = compressionInfo; - this.compressionMetadata = null; - this.repairedAt = repairedAt; - this.pendingRepair = pendingRepair; - this.sstableLevel = sstableLevel; - this.header = header; - this.size = calculateSize(); - } - - public FileMessageHeader(TableId tableId, - InetAddressAndPort sender, - UUID planId, - int sessionIndex, - int sequenceNumber, - Version version, - SSTableFormat.Type format, - long estimatedKeys, - List<Pair<Long, Long>> sections, - CompressionMetadata compressionMetadata, - long repairedAt, - UUID pendingRepair, - int sstableLevel, - SerializationHeader.Component header) - { - this.tableId = tableId; - this.sender = sender; - this.planId = planId; - this.sessionIndex = sessionIndex; - this.sequenceNumber = sequenceNumber; - this.version = version; - this.format = format; - this.estimatedKeys = estimatedKeys; - this.sections = sections; - this.compressionInfo = null; - this.compressionMetadata = compressionMetadata; - this.repairedAt = repairedAt; - this.pendingRepair = pendingRepair; - this.sstableLevel = sstableLevel; - this.header = header; - this.size = calculateSize(); - } - - public boolean isCompressed() - { - return compressionInfo != null || compressionMetadata != null; - } - - /** - * @return total file size to transfer in bytes - */ - public long size() - { - return size; - } - - private long calculateSize() - { - long transferSize = 0; - if (compressionInfo != null) - { - // calculate total length of transferring chunks - for (CompressionMetadata.Chunk chunk : compressionInfo.chunks) - transferSize += chunk.length + 4; // 4 bytes for CRC - } - else if (compressionMetadata != null) - { - transferSize = compressionMetadata.getTotalSizeForSections(sections); - } - else - { - for (Pair<Long, Long> section : sections) - transferSize += section.right - section.left; - } - return transferSize; - } - - @Override - public String toString() - { - final StringBuilder sb = new StringBuilder("Header ("); - sb.append("tableId: ").append(tableId); - sb.append(", #").append(sequenceNumber); - sb.append(", version: ").append(version); - sb.append(", format: ").append(format); - sb.append(", estimated keys: ").append(estimatedKeys); - sb.append(", transfer size: ").append(size()); - sb.append(", compressed?: ").append(isCompressed()); - sb.append(", repairedAt: ").append(repairedAt); - sb.append(", pendingRepair: ").append(pendingRepair); - sb.append(", level: ").append(sstableLevel); - sb.append(')'); - return sb.toString(); - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - FileMessageHeader that = (FileMessageHeader) o; - return sequenceNumber == that.sequenceNumber && tableId.equals(that.tableId); - } - - @Override - public int hashCode() - { - int result = tableId.hashCode(); - result = 31 * result + sequenceNumber; - return result; - } - - public void addSessionInfo(StreamSession session) - { - planId = session.planId(); - sessionIndex = session.sessionIndex(); - } - - static class FileMessageHeaderSerializer - { - public CompressionInfo serialize(FileMessageHeader header, DataOutputPlus out, int version) throws IOException - { - header.tableId.serialize(out); - CompactEndpointSerializationHelper.streamingInstance.serialize(header.sender, out, version); - UUIDSerializer.serializer.serialize(header.planId, out, version); - out.writeInt(header.sessionIndex); - out.writeInt(header.sequenceNumber); - out.writeUTF(header.version.toString()); - out.writeUTF(header.format.name); - - out.writeLong(header.estimatedKeys); - out.writeInt(header.sections.size()); - for (Pair<Long, Long> section : header.sections) - { - out.writeLong(section.left); - out.writeLong(section.right); - } - // construct CompressionInfo here to avoid holding large number of Chunks on heap. - CompressionInfo compressionInfo = null; - if (header.compressionMetadata != null) - compressionInfo = new CompressionInfo(header.compressionMetadata.getChunksForSections(header.sections), header.compressionMetadata.parameters); - CompressionInfo.serializer.serialize(compressionInfo, out, version); - out.writeLong(header.repairedAt); - out.writeBoolean(header.pendingRepair != null); - if (header.pendingRepair != null) - { - UUIDSerializer.serializer.serialize(header.pendingRepair, out, version); - } - out.writeInt(header.sstableLevel); - - SerializationHeader.serializer.serialize(header.version, header.header, out); - return compressionInfo; - } - - public FileMessageHeader deserialize(DataInputPlus in, int version) throws IOException - { - TableId tableId = TableId.deserialize(in); - InetAddressAndPort sender = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version); - UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); - int sessionIndex = in.readInt(); - int sequenceNumber = in.readInt(); - Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF()); - SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF()); - - long estimatedKeys = in.readLong(); - int count = in.readInt(); - List<Pair<Long, Long>> sections = new ArrayList<>(count); - for (int k = 0; k < count; k++) - sections.add(Pair.create(in.readLong(), in.readLong())); - CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version); - long repairedAt = in.readLong(); - UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null; - int sstableLevel = in.readInt(); - SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in); - - return new FileMessageHeader(tableId, sender, planId, sessionIndex, sequenceNumber, sstableVersion, format, estimatedKeys, sections, compressionInfo, repairedAt, pendingRepair, sstableLevel, header); - } - - public long serializedSize(FileMessageHeader header, int version) - { - long size = header.tableId.serializedSize(); - size += CompactEndpointSerializationHelper.streamingInstance.serializedSize(header.sender, version); - size += UUIDSerializer.serializer.serializedSize(header.planId, version); - size += TypeSizes.sizeof(header.sessionIndex); - size += TypeSizes.sizeof(header.sequenceNumber); - size += TypeSizes.sizeof(header.version.toString()); - size += TypeSizes.sizeof(header.format.name); - size += TypeSizes.sizeof(header.estimatedKeys); - - size += TypeSizes.sizeof(header.sections.size()); - for (Pair<Long, Long> section : header.sections) - { - size += TypeSizes.sizeof(section.left); - size += TypeSizes.sizeof(section.right); - } - size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version); - size += TypeSizes.sizeof(header.repairedAt); - size += TypeSizes.sizeof(header.pendingRepair != null); - size += header.pendingRepair != null ? UUIDSerializer.serializer.serializedSize(header.pendingRepair, version) : 0; - size += TypeSizes.sizeof(header.sstableLevel); - - size += SerializationHeader.serializer.serializedSize(header.version, header.header); - - return size; - } - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java deleted file mode 100644 index 9f43982..0000000 --- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.streaming.messages; - -import java.io.IOException; - -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.io.sstable.SSTableMultiWriter; -import org.apache.cassandra.io.util.DataInputPlus; - -import org.apache.cassandra.io.util.DataOutputStreamPlus; -import org.apache.cassandra.streaming.StreamManager; -import org.apache.cassandra.streaming.StreamReader; -import org.apache.cassandra.streaming.StreamReceiveException; -import org.apache.cassandra.streaming.StreamSession; -import org.apache.cassandra.streaming.compress.CompressedStreamReader; -import org.apache.cassandra.utils.JVMStabilityInspector; - - -/** - * IncomingFileMessage is used to receive the part(or whole) of a SSTable data file. - */ -public class IncomingFileMessage extends StreamMessage -{ - public static Serializer<IncomingFileMessage> serializer = new Serializer<IncomingFileMessage>() - { - @SuppressWarnings("resource") - public IncomingFileMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException - { - FileMessageHeader header = FileMessageHeader.serializer.deserialize(input, version); - session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex); - if (session == null) - throw new IllegalStateException(String.format("unknown stream session: %s - %d", header.planId, header.sessionIndex)); - ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(header.tableId); - if (cfs == null) - throw new StreamReceiveException(session, "CF " + header.tableId + " was dropped during streaming"); - - StreamReader reader = !header.isCompressed() ? new StreamReader(header, session) - : new CompressedStreamReader(header, session); - - try - { - return new IncomingFileMessage(reader.read(input), header); - } - catch (Throwable t) - { - JVMStabilityInspector.inspectThrowable(t); - throw new StreamReceiveException(session, t); - } - } - - public void serialize(IncomingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) - { - throw new UnsupportedOperationException("Not allowed to call serialize on an incoming file"); - } - - public long serializedSize(IncomingFileMessage message, int version) - { - throw new UnsupportedOperationException("Not allowed to call serializedSize on an incoming file"); - } - }; - - public FileMessageHeader header; - public SSTableMultiWriter sstable; - - public IncomingFileMessage(SSTableMultiWriter sstable, FileMessageHeader header) - { - super(Type.FILE); - this.header = header; - this.sstable = sstable; - } - - @Override - public String toString() - { - String filename = sstable != null ? sstable.getFilename() : null; - return "File (" + header + ", file: " + filename + ")"; - } -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java new file mode 100644 index 0000000..e17c3ab --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/messages/IncomingStreamMessage.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming.messages; + +import java.io.IOException; +import java.util.Objects; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.io.util.DataInputPlus; + +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.streaming.IncomingStream; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamReceiveException; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.JVMStabilityInspector; + +public class IncomingStreamMessage extends StreamMessage +{ + public static Serializer<IncomingStreamMessage> serializer = new Serializer<IncomingStreamMessage>() + { + @SuppressWarnings("resource") + public IncomingStreamMessage deserialize(DataInputPlus input, int version, StreamSession session) throws IOException + { + StreamMessageHeader header = StreamMessageHeader.serializer.deserialize(input, version); + session = StreamManager.instance.findSession(header.sender, header.planId, header.sessionIndex); + if (session == null) + throw new IllegalStateException(String.format("unknown stream session: %s - %d", header.planId, header.sessionIndex)); + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(header.tableId); + if (cfs == null) + throw new StreamReceiveException(session, "CF " + header.tableId + " was dropped during streaming"); + + IncomingStream incomingData = cfs.getStreamManager().prepareIncomingStream(session, header); + incomingData.read(input, version); + + try + { + return new IncomingStreamMessage(incomingData, header); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + throw new StreamReceiveException(session, t); + } + } + + public void serialize(IncomingStreamMessage message, DataOutputStreamPlus out, int version, StreamSession session) + { + throw new UnsupportedOperationException("Not allowed to call serialize on an incoming stream"); + } + + public long serializedSize(IncomingStreamMessage message, int version) + { + throw new UnsupportedOperationException("Not allowed to call serializedSize on an incoming stream"); + } + }; + + public StreamMessageHeader header; + public IncomingStream stream; + + public IncomingStreamMessage(IncomingStream stream, StreamMessageHeader header) + { + super(Type.STREAM); + this.stream = stream; + this.header = header; + } + + @Override + public String toString() + { + return "IncomingStreamMessage{" + + "header=" + header + + ", stream=" + stream + + '}'; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + IncomingStreamMessage that = (IncomingStreamMessage) o; + return Objects.equals(header, that.header) && + Objects.equals(stream, that.stream); + } + + public int hashCode() + { + + return Objects.hash(header, stream); + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java deleted file mode 100644 index 8bbcc05..0000000 --- a/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.streaming.messages; - -import java.io.IOException; -import java.util.List; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputStreamPlus; -import org.apache.cassandra.streaming.StreamSession; -import org.apache.cassandra.streaming.StreamWriter; -import org.apache.cassandra.streaming.compress.CompressedStreamWriter; -import org.apache.cassandra.streaming.compress.CompressionInfo; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.concurrent.Ref; - -/** - * OutgoingFileMessage is used to transfer the part(or whole) of a SSTable data file. - */ -public class OutgoingFileMessage extends StreamMessage -{ - public static Serializer<OutgoingFileMessage> serializer = new Serializer<OutgoingFileMessage>() - { - public OutgoingFileMessage deserialize(DataInputPlus in, int version, StreamSession session) - { - throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing file"); - } - - public void serialize(OutgoingFileMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException - { - message.startTransfer(); - try - { - message.serialize(out, version, session); - session.fileSent(message.header); - } - finally - { - message.finishTransfer(); - } - } - - public long serializedSize(OutgoingFileMessage message, int version) - { - return 0; - } - }; - - public final FileMessageHeader header; - private final Ref<SSTableReader> ref; - private final String filename; - private boolean completed = false; - private boolean transferring = false; - - public OutgoingFileMessage(Ref<SSTableReader> ref, StreamSession session, int sequenceNumber, long estimatedKeys, List<Pair<Long, Long>> sections, boolean keepSSTableLevel) - { - super(Type.FILE); - this.ref = ref; - - SSTableReader sstable = ref.get(); - filename = sstable.getFilename(); - this.header = new FileMessageHeader(sstable.metadata().id, - FBUtilities.getBroadcastAddressAndPort(), - session.planId(), - session.sessionIndex(), - sequenceNumber, - sstable.descriptor.version, - sstable.descriptor.formatType, - estimatedKeys, - sections, - sstable.compression ? sstable.getCompressionMetadata() : null, - sstable.getRepairedAt(), - sstable.getPendingRepair(), - keepSSTableLevel ? sstable.getSSTableLevel() : 0, - sstable.header.toComponent()); - } - - public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession session) throws IOException - { - if (completed) - { - return; - } - - CompressionInfo compressionInfo = FileMessageHeader.serializer.serialize(header, out, version); - out.flush(); - final SSTableReader reader = ref.get(); - StreamWriter writer = compressionInfo == null ? - new StreamWriter(reader, header.sections, session) : - new CompressedStreamWriter(reader, header.sections, - compressionInfo, session); - writer.write(out); - } - - @VisibleForTesting - public synchronized void finishTransfer() - { - transferring = false; - //session was aborted mid-transfer, now it's safe to release - if (completed) - { - ref.release(); - } - } - - @VisibleForTesting - public synchronized void startTransfer() - { - if (completed) - throw new RuntimeException(String.format("Transfer of file %s already completed or aborted (perhaps session failed?).", - filename)); - transferring = true; - } - - public synchronized void complete() - { - if (!completed) - { - completed = true; - //release only if not transferring - if (!transferring) - { - ref.release(); - } - } - } - - @Override - public String toString() - { - return "File (" + header + ", file: " + filename + ")"; - } - - public String getFilename() - { - return filename; - } -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java new file mode 100644 index 0000000..263aabd --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/messages/OutgoingStreamMessage.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming.messages; + +import java.io.IOException; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.OutgoingStream; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +public class OutgoingStreamMessage extends StreamMessage +{ + public static Serializer<OutgoingStreamMessage> serializer = new Serializer<OutgoingStreamMessage>() + { + public OutgoingStreamMessage deserialize(DataInputPlus in, int version, StreamSession session) + { + throw new UnsupportedOperationException("Not allowed to call deserialize on an outgoing stream"); + } + + public void serialize(OutgoingStreamMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException + { + message.startTransfer(); + try + { + message.serialize(out, version, session); + session.streamSent(message); + } + finally + { + message.finishTransfer(); + } + } + + public long serializedSize(OutgoingStreamMessage message, int version) + { + return 0; + } + }; + + public final StreamMessageHeader header; + private final TableId tableId; + public final OutgoingStream stream; + private boolean completed = false; + private boolean transferring = false; + + public OutgoingStreamMessage(TableId tableId, StreamSession session, OutgoingStream stream, int sequenceNumber) + { + super(Type.STREAM); + this.tableId = tableId; + + this.stream = stream; + this.header = new StreamMessageHeader(tableId, + FBUtilities.getBroadcastAddressAndPort(), + session.planId(), + session.sessionIndex(), + sequenceNumber, + stream.getRepairedAt(), + stream.getPendingRepair()); + } + + public synchronized void serialize(DataOutputStreamPlus out, int version, StreamSession session) throws IOException + { + if (completed) + { + return; + } + StreamMessageHeader.serializer.serialize(header, out, version); + stream.write(session, out, version); + } + + @VisibleForTesting + public synchronized void finishTransfer() + { + transferring = false; + //session was aborted mid-transfer, now it's safe to release + if (completed) + { + stream.finish(); + } + } + + @VisibleForTesting + public synchronized void startTransfer() + { + if (completed) + throw new RuntimeException(String.format("Transfer of stream %s already completed or aborted (perhaps session failed?).", + stream)); + transferring = true; + } + + public synchronized void complete() + { + if (!completed) + { + completed = true; + //release only if not transferring + if (!transferring) + { + stream.finish(); + } + } + } + + @Override + public String toString() + { + return "OutgoingStreamMessage{" + + "header=" + header + + ", stream=" + stream + + '}'; + } + + public String getName() + { + return stream.getName(); + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index fced133..fbd3e21 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -44,18 +44,16 @@ public class StreamInitMessage extends StreamMessage public final UUID planId; public final StreamOperation streamOperation; - public final boolean keepSSTableLevel; public final UUID pendingRepair; public final PreviewKind previewKind; - public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind) + public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID planId, StreamOperation streamOperation, UUID pendingRepair, PreviewKind previewKind) { super(Type.STREAM_INIT); this.from = from; this.sessionIndex = sessionIndex; this.planId = planId; this.streamOperation = streamOperation; - this.keepSSTableLevel = keepSSTableLevel; this.pendingRepair = pendingRepair; this.previewKind = previewKind; } @@ -77,7 +75,6 @@ public class StreamInitMessage extends StreamMessage out.writeInt(message.sessionIndex); UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version); out.writeUTF(message.streamOperation.getDescription()); - out.writeBoolean(message.keepSSTableLevel); out.writeBoolean(message.pendingRepair != null); if (message.pendingRepair != null) @@ -93,11 +90,10 @@ public class StreamInitMessage extends StreamMessage int sessionIndex = in.readInt(); UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); String description = in.readUTF(); - boolean keepSSTableLevel = in.readBoolean(); UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null; PreviewKind previewKind = PreviewKind.deserialize(in.readInt()); - return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), keepSSTableLevel, pendingRepair, previewKind); + return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), pendingRepair, previewKind); } public long serializedSize(StreamInitMessage message, int version) @@ -106,7 +102,6 @@ public class StreamInitMessage extends StreamMessage size += TypeSizes.sizeof(message.sessionIndex); size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version); size += TypeSizes.sizeof(message.streamOperation.getDescription()); - size += TypeSizes.sizeof(message.keepSSTableLevel); size += TypeSizes.sizeof(message.pendingRepair != null); if (message.pendingRepair != null) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java index feeab05..7ab3d34 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java @@ -67,7 +67,7 @@ public abstract class StreamMessage public enum Type { PREPARE_SYN(1, 5, PrepareSynMessage.serializer), - FILE(2, 0, IncomingFileMessage.serializer, OutgoingFileMessage.serializer), + STREAM(2, 0, IncomingStreamMessage.serializer, OutgoingStreamMessage.serializer), RECEIVED(3, 4, ReceivedMessage.serializer), COMPLETE(5, 1, CompleteMessage.serializer), SESSION_FAILED(6, 5, SessionFailedMessage.serializer), http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java new file mode 100644 index 0000000..84cf3a3 --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessageHeader.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming.messages; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.CompactEndpointSerializationHelper; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.UUIDSerializer; + +/** + * StreamingFileHeader is appended before sending actual data to describe what it's sending. + */ +public class StreamMessageHeader +{ + public static FileMessageHeaderSerializer serializer = new FileMessageHeaderSerializer(); + + public final TableId tableId; + public UUID planId; + public int sessionIndex; + public final int sequenceNumber; + public final long repairedAt; + public final UUID pendingRepair; + public final InetAddressAndPort sender; + + public StreamMessageHeader(TableId tableId, + InetAddressAndPort sender, + UUID planId, + int sessionIndex, + int sequenceNumber, + long repairedAt, + UUID pendingRepair) + { + this.tableId = tableId; + this.sender = sender; + this.planId = planId; + this.sessionIndex = sessionIndex; + this.sequenceNumber = sequenceNumber; + this.repairedAt = repairedAt; + this.pendingRepair = pendingRepair; + } + + @Override + public String toString() + { + final StringBuilder sb = new StringBuilder("Header ("); + sb.append("tableId: ").append(tableId); + sb.append(", #").append(sequenceNumber); + sb.append(", repairedAt: ").append(repairedAt); + sb.append(", pendingRepair: ").append(pendingRepair); + sb.append(')'); + return sb.toString(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StreamMessageHeader that = (StreamMessageHeader) o; + return sequenceNumber == that.sequenceNumber && tableId.equals(that.tableId); + } + + @Override + public int hashCode() + { + int result = tableId.hashCode(); + result = 31 * result + sequenceNumber; + return result; + } + + public void addSessionInfo(StreamSession session) + { + planId = session.planId(); + sessionIndex = session.sessionIndex(); + } + + static class FileMessageHeaderSerializer + { + public void serialize(StreamMessageHeader header, DataOutputPlus out, int version) throws IOException + { + header.tableId.serialize(out); + CompactEndpointSerializationHelper.streamingInstance.serialize(header.sender, out, version); + UUIDSerializer.serializer.serialize(header.planId, out, version); + out.writeInt(header.sessionIndex); + out.writeInt(header.sequenceNumber); + out.writeLong(header.repairedAt); + out.writeBoolean(header.pendingRepair != null); + if (header.pendingRepair != null) + { + UUIDSerializer.serializer.serialize(header.pendingRepair, out, version); + } + } + + public StreamMessageHeader deserialize(DataInputPlus in, int version) throws IOException + { + TableId tableId = TableId.deserialize(in); + InetAddressAndPort sender = CompactEndpointSerializationHelper.streamingInstance.deserialize(in, version); + UUID planId = UUIDSerializer.serializer.deserialize(in, MessagingService.current_version); + int sessionIndex = in.readInt(); + int sequenceNumber = in.readInt(); + long repairedAt = in.readLong(); + UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null; + + return new StreamMessageHeader(tableId, sender, planId, sessionIndex, sequenceNumber, repairedAt, pendingRepair); + } + + public long serializedSize(StreamMessageHeader header, int version) + { + long size = header.tableId.serializedSize(); + size += CompactEndpointSerializationHelper.streamingInstance.serializedSize(header.sender, version); + size += UUIDSerializer.serializer.serializedSize(header.planId, version); + size += TypeSizes.sizeof(header.sessionIndex); + size += TypeSizes.sizeof(header.sequenceNumber); + size += TypeSizes.sizeof(header.repairedAt); + size += TypeSizes.sizeof(header.pendingRepair != null); + size += header.pendingRepair != null ? UUIDSerializer.serializer.serializedSize(header.pendingRepair, version) : 0; + + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java new file mode 100644 index 0000000..289bb0f --- /dev/null +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.util.ArrayList; + +import org.junit.Test; + +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.big.BigFormat; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.serializers.SerializationUtils; + +public class CassandraStreamHeaderTest +{ + @Test + public void serializerTest() + { + String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)"; + TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build(); + CassandraStreamHeader header = new CassandraStreamHeader(BigFormat.latestVersion, + SSTableFormat.Type.BIG, + 0, + new ArrayList<>(), + ((CompressionMetadata) null), + 0, + SerializationHeader.makeWithoutStats(metadata).toComponent()); + + SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java new file mode 100644 index 0000000..8497e71 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.DefaultConnectionFactory; +import org.apache.cassandra.streaming.OutgoingStream; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.StreamConnectionFactory; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.concurrent.Ref; + +import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; + +public class CassandraStreamManagerTest +{ + private static final String KEYSPACE = null; + private String keyspace = null; + private static final String table = "tbl"; + private static final StreamConnectionFactory connectionFactory = new DefaultConnectionFactory(); + + private TableMetadata tbm; + private ColumnFamilyStore cfs; + + @BeforeClass + public static void setupClass() throws Exception + { + SchemaLoader.prepareServer(); + } + + @Before + public void createKeyspace() throws Exception + { + keyspace = String.format("ks_%s", System.currentTimeMillis()); + tbm = CreateTableStatement.parse(String.format("CREATE TABLE %s (k INT PRIMARY KEY, v INT)", table), keyspace).build(); + SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), tbm); + cfs = Schema.instance.getColumnFamilyStoreInstance(tbm.id); + } + + private static StreamSession session(UUID pendingRepair) + { + try + { + return new StreamSession(StreamOperation.REPAIR, + InetAddressAndPort.getByName("127.0.0.1"), + InetAddressAndPort.getByName("127.0.0.2"), + connectionFactory, + 0, + pendingRepair, + PreviewKind.NONE); + } + catch (UnknownHostException e) + { + throw new AssertionError(e); + } + } + + private SSTableReader createSSTable(Runnable queryable) + { + Set<SSTableReader> before = cfs.getLiveSSTables(); + queryable.run(); + cfs.forceBlockingFlush(); + Set<SSTableReader> after = cfs.getLiveSSTables(); + + Set<SSTableReader> diff = Sets.difference(after, before); + return Iterables.getOnlyElement(diff); + } + + private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException + { + Descriptor descriptor = sstable.descriptor; + descriptor.getMetadataSerializer().mutateRepaired(descriptor, repairedAt, pendingRepair); + sstable.reloadSSTableMetadata(); + + } + + private static Set<SSTableReader> sstablesFromStreams(Collection<OutgoingStream> streams) + { + Set<SSTableReader> sstables = new HashSet<>(); + for (OutgoingStream stream: streams) + { + Ref<SSTableReader> ref = CassandraOutgoingFile.fromStream(stream).getRef(); + sstables.add(ref.get()); + ref.release(); + } + return sstables; + } + + private Set<SSTableReader> getReadersForRange(Range<Token> range) + { + Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(NO_PENDING_REPAIR), + Collections.singleton(range), + NO_PENDING_REPAIR, + PreviewKind.NONE); + return sstablesFromStreams(streams); + } + + private Set<SSTableReader> selectReaders(UUID pendingRepair) + { + IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken())); + Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(pendingRepair), ranges, pendingRepair, PreviewKind.NONE); + return sstablesFromStreams(streams); + } + + @Test + public void incrementalSSTableSelection() throws Exception + { + // make 3 tables, 1 unrepaired, 2 pending repair with different repair ids, and 1 repaired + SSTableReader sstable1 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (1, 1)", keyspace, table))); + SSTableReader sstable2 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (2, 2)", keyspace, table))); + SSTableReader sstable3 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (3, 3)", keyspace, table))); + SSTableReader sstable4 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (4, 4)", keyspace, table))); + + + UUID pendingRepair = UUIDGen.getTimeUUID(); + long repairedAt = System.currentTimeMillis(); + mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair); + mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID()); + mutateRepaired(sstable4, repairedAt, NO_PENDING_REPAIR); + + + + // no pending repair should return all sstables + Assert.assertEquals(Sets.newHashSet(sstable1, sstable2, sstable3, sstable4), selectReaders(NO_PENDING_REPAIR)); + + // a pending repair arg should only return sstables with the same pending repair id + Assert.assertEquals(Sets.newHashSet(sstable2), selectReaders(pendingRepair)); + } + + @Test + public void testSSTableSectionsForRanges() throws Exception + { + cfs.truncateBlocking(); + + createSSTable(() -> { + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (1, 1)", keyspace, table)); + QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (2, 2)", keyspace, table)); + }); + + Collection<SSTableReader> allSSTables = cfs.getLiveSSTables(); + Assert.assertEquals(1, allSSTables.size()); + final Token firstToken = allSSTables.iterator().next().first.getToken(); + DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1); + + Set<SSTableReader> sstablesBeforeRewrite = getReadersForRange(new Range<>(firstToken, firstToken)); + Assert.assertEquals(1, sstablesBeforeRewrite.size()); + final AtomicInteger checkCount = new AtomicInteger(); + // needed since we get notified when compaction is done as well - we can't get sections for ranges for obsoleted sstables + final AtomicBoolean done = new AtomicBoolean(false); + final AtomicBoolean failed = new AtomicBoolean(false); + Runnable r = new Runnable() + { + public void run() + { + while (!done.get()) + { + Range<Token> range = new Range<Token>(firstToken, firstToken); + Set<SSTableReader> sstables = getReadersForRange(range); + if (sstables.size() != 1) + failed.set(true); + checkCount.incrementAndGet(); + Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS); + } + } + }; + Thread t = NamedThreadFactory.createThread(r); + try + { + t.start(); + cfs.forceMajorCompaction(); + // reset + } + finally + { + DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50); + done.set(true); + t.join(20); + } + Assert.assertFalse(failed.get()); + Assert.assertTrue(checkCount.get() >= 2); + cfs.truncateBlocking(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java index ad5f8f5..8f0a407 100644 --- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java +++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java @@ -27,6 +27,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.streaming.DefaultConnectionFactory; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.StreamEvent; +import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.FBUtilities; @@ -51,7 +52,7 @@ public class StreamStateStoreTest Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100")); InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); - StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null, PreviewKind.NONE); + StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, local, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE); session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf")); StreamStateStore store = new StreamStateStore(); @@ -72,7 +73,7 @@ public class StreamStateStoreTest // add different range within the same keyspace Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200")); - session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null, PreviewKind.NONE); + session = new StreamSession(StreamOperation.BOOTSTRAP, local, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE); session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf")); session.state(StreamSession.State.COMPLETE); store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index e399c67..8dd8197 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Random; import java.util.UUID; +import com.google.common.collect.Lists; import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; @@ -42,6 +43,7 @@ import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.compaction.Verifier; +import org.apache.cassandra.db.streaming.CassandraOutgoingFile; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -52,6 +54,7 @@ import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.sstable.format.big.BigFormat; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.OutgoingStream; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.StreamOperation; @@ -248,12 +251,11 @@ public class LegacySSTableTest List<Range<Token>> ranges = new ArrayList<>(); ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("100")))); ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("100")), p.getMinimumToken())); - ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>(); - details.add(new StreamSession.SSTableStreamingSections(sstable.ref(), - sstable.getPositionsForRanges(ranges), - sstable.estimatedKeysForRanges(ranges))); - new StreamPlan(StreamOperation.OTHER).transferFiles(FBUtilities.getBroadcastAddressAndPort(), details) - .execute().get(); + List<OutgoingStream> streams = Lists.newArrayList(new CassandraOutgoingFile(StreamOperation.OTHER, + sstable.ref(), + sstable.getPositionsForRanges(ranges), + sstable.estimatedKeysForRanges(ranges))); + new StreamPlan(StreamOperation.OTHER).transferStreams(FBUtilities.getBroadcastAddressAndPort(), streams).execute().get(); } private static void truncateLegacyTables(String legacyVersion) throws Exception http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 53f5ab3..5c3e8c9 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -774,65 +774,6 @@ public class SSTableRewriterTest extends SSTableWriterTestBase validateCFS(cfs); } - @Test - public void testSSTableSectionsForRanges() throws IOException, InterruptedException, ExecutionException - { - Keyspace keyspace = Keyspace.open(KEYSPACE); - final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - truncate(cfs); - - cfs.addSSTable(writeFile(cfs, 1000)); - - Collection<SSTableReader> allSSTables = cfs.getLiveSSTables(); - assertEquals(1, allSSTables.size()); - final Token firstToken = allSSTables.iterator().next().first.getToken(); - DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(1); - - List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges( - Collections.singleton(new Range<Token>(firstToken, firstToken)), - Collections.singleton(cfs), null, PreviewKind.NONE); - assertEquals(1, sectionsBeforeRewrite.size()); - for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite) - section.ref.release(); - final AtomicInteger checkCount = new AtomicInteger(); - // needed since we get notified when compaction is done as well - we can't get sections for ranges for obsoleted sstables - final AtomicBoolean done = new AtomicBoolean(false); - final AtomicBoolean failed = new AtomicBoolean(false); - Runnable r = new Runnable() - { - public void run() - { - while (!done.get()) - { - Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken)); - List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), null, PreviewKind.NONE); - if (sections.size() != 1) - failed.set(true); - for (StreamSession.SSTableStreamingSections section : sections) - section.ref.release(); - checkCount.incrementAndGet(); - Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS); - } - } - }; - Thread t = NamedThreadFactory.createThread(r); - try - { - t.start(); - cfs.forceMajorCompaction(); - // reset - } - finally - { - DatabaseDescriptor.setSSTablePreempiveOpenIntervalInMB(50); - done.set(true); - t.join(20); - } - assertFalse(failed.get()); - assertTrue(checkCount.get() >= 2); - truncate(cfs); - } - /** * emulates anticompaction - writing from one source sstable to two new sstables * http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/serializers/SerializationUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/serializers/SerializationUtils.java b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java new file mode 100644 index 0000000..7ce4ec5 --- /dev/null +++ b/test/unit/org/apache/cassandra/serializers/SerializationUtils.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.serializers; + +import java.io.IOException; + +import org.junit.Assert; + +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.net.MessagingService; + +public class SerializationUtils +{ + + public static <T> T cycleSerialization(T src, IVersionedSerializer<T> serializer, int version) + { + int expectedSize = (int) serializer.serializedSize(src, version); + + try (DataOutputBuffer out = new DataOutputBuffer(expectedSize)) + { + serializer.serialize(src, out, version); + Assert.assertEquals(expectedSize, out.buffer().limit()); + try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false)) + { + return serializer.deserialize(in, version); + } + } + catch (IOException e) + { + throw new AssertionError(e); + } + } + + public static <T> T cycleSerialization(T src, IVersionedSerializer<T> serializer) + { + return cycleSerialization(src, serializer, MessagingService.current_version); + } + + public static <T> void assertSerializationCycle(T src, IVersionedSerializer<T> serializer, int version) + { + T dst = cycleSerialization(src, serializer, version); + Assert.assertEquals(src, dst); + } + + public static <T> void assertSerializationCycle(T src, IVersionedSerializer<T> serializer) + { + assertSerializationCycle(src, serializer, MessagingService.current_version); + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java deleted file mode 100644 index 6abc2a2..0000000 --- a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.streaming; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.cql3.statements.CreateTableStatement; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.utils.UUIDGen; - -public class StreamSessionTest -{ - private String keyspace = null; - private static final String table = "tbl"; - - private TableMetadata tbm; - private ColumnFamilyStore cfs; - - @BeforeClass - public static void setupClass() throws Exception - { - SchemaLoader.prepareServer(); - } - - @Before - public void createKeyspace() throws Exception - { - keyspace = String.format("ks_%s", System.currentTimeMillis()); - tbm = CreateTableStatement.parse(String.format("CREATE TABLE %s (k INT PRIMARY KEY, v INT)", table), keyspace).build(); - SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), tbm); - cfs = Schema.instance.getColumnFamilyStoreInstance(tbm.id); - } - - private SSTableReader createSSTable(Runnable queryable) - { - Set<SSTableReader> before = cfs.getLiveSSTables(); - queryable.run(); - cfs.forceBlockingFlush(); - Set<SSTableReader> after = cfs.getLiveSSTables(); - - Set<SSTableReader> diff = Sets.difference(after, before); - assert diff.size() == 1 : "Expected 1 new sstable, got " + diff.size(); - return diff.iterator().next(); - } - - private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException - { - Descriptor descriptor = sstable.descriptor; - descriptor.getMetadataSerializer().mutateRepaired(descriptor, repairedAt, pendingRepair); - sstable.reloadSSTableMetadata(); - - } - - private Set<SSTableReader> selectReaders(UUID pendingRepair) - { - IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); - Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken())); - List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(ranges, - Lists.newArrayList(cfs), - pendingRepair, - PreviewKind.NONE); - Set<SSTableReader> sstables = new HashSet<>(); - for (StreamSession.SSTableStreamingSections section: sections) - { - sstables.add(section.ref.get()); - } - return sstables; - } - - @Test - public void incrementalSSTableSelection() throws Exception - { - // make 3 tables, 1 unrepaired, 2 pending repair with different repair ids, and 1 repaired - SSTableReader sstable1 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (1, 1)", keyspace, table))); - SSTableReader sstable2 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (2, 2)", keyspace, table))); - SSTableReader sstable3 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (3, 3)", keyspace, table))); - SSTableReader sstable4 = createSSTable(() -> QueryProcessor.executeInternal(String.format("INSERT INTO %s.%s (k, v) VALUES (4, 4)", keyspace, table))); - - - UUID pendingRepair = UUIDGen.getTimeUUID(); - long repairedAt = System.currentTimeMillis(); - mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair); - mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID()); - mutateRepaired(sstable4, repairedAt, ActiveRepairService.NO_PENDING_REPAIR); - - // no pending repair should return all sstables - Assert.assertEquals(Sets.newHashSet(sstable1, sstable2, sstable3, sstable4), selectReaders(ActiveRepairService.NO_PENDING_REPAIR)); - - // a pending repair arg should only return sstables with the same pending repair id - Assert.assertEquals(Sets.newHashSet(sstable2), selectReaders(pendingRepair)); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index ceaaae0..45c917a 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -36,6 +36,7 @@ import junit.framework.Assert; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.streaming.CassandraOutgoingFile; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; @@ -43,7 +44,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.streaming.messages.OutgoingFileMessage; +import org.apache.cassandra.streaming.messages.OutgoingStreamMessage; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Ref; @@ -75,7 +76,7 @@ public class StreamTransferTaskTest public void testScheduleTimeout() throws Exception { InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); - StreamSession session = new StreamSession(peer, peer, (connectionId, protocolVersion) -> new EmbeddedChannel(), 0, true, UUID.randomUUID(), PreviewKind.ALL); + StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, peer, (connectionId, protocolVersion) -> new EmbeddedChannel(), 0, UUID.randomUUID(), PreviewKind.ALL); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); // create two sstables @@ -91,7 +92,7 @@ public class StreamTransferTaskTest { List<Range<Token>> ranges = new ArrayList<>(); ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); - task.addTransferFile(sstable.selfRef(), 1, sstable.getPositionsForRanges(ranges)); + task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.selfRef(), sstable.getPositionsForRanges(ranges), 1)); } assertEquals(2, task.getTotalNumberOfFiles()); @@ -121,9 +122,9 @@ public class StreamTransferTaskTest public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception { InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort(); - StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, new DefaultConnectionFactory(), false, null, PreviewKind.NONE); + StreamCoordinator streamCoordinator = new StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), false, null, PreviewKind.NONE); StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), streamCoordinator); - StreamSession session = new StreamSession(peer, peer, null, 0, true, null, PreviewKind.NONE); + StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, peer, peer, null, 0, null, PreviewKind.NONE); session.init(future); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); @@ -143,7 +144,7 @@ public class StreamTransferTaskTest ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken())); Ref<SSTableReader> ref = sstable.selfRef(); refs.add(ref); - task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges)); + task.addTransferStream(new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, ref, sstable.getPositionsForRanges(ranges), 1)); } assertEquals(2, task.getTotalNumberOfFiles()); @@ -151,10 +152,10 @@ public class StreamTransferTaskTest session.transfers.put(TableId.generate(), task); //make a copy of outgoing file messages, since task is cleared when it's aborted - Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values()); + Collection<OutgoingStreamMessage> files = new LinkedList<>(task.streams.values()); //simulate start transfer - for (OutgoingFileMessage file : files) + for (OutgoingStreamMessage file : files) { file.startTransfer(); } @@ -169,7 +170,7 @@ public class StreamTransferTaskTest } //simulate finish transfer - for (OutgoingFileMessage file : files) + for (OutgoingStreamMessage file : files) { file.finishTransfer(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 16c07a0..575200a 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -34,6 +34,7 @@ import junit.framework.Assert; import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.db.streaming.CassandraOutgoingFile; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; @@ -254,13 +255,13 @@ public class StreamingTransferTest private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception { - StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))); + StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferStreams(LOCAL, makeOutgoingStreams(ranges, Refs.tryRef(Arrays.asList(sstable)))); streamPlan.execute().get(); //cannot add files after stream session is finished try { - streamPlan.transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))); + streamPlan.transferStreams(LOCAL, makeOutgoingStreams(ranges, Refs.tryRef(Arrays.asList(sstable)))); fail("Should have thrown exception"); } catch (RuntimeException e) @@ -269,16 +270,22 @@ public class StreamingTransferTest } } - private Collection<StreamSession.SSTableStreamingSections> makeStreamingDetails(List<Range<Token>> ranges, Refs<SSTableReader> sstables) + private Collection<OutgoingStream> makeOutgoingStreams(StreamOperation operation, List<Range<Token>> ranges, Refs<SSTableReader> sstables) { - ArrayList<StreamSession.SSTableStreamingSections> details = new ArrayList<>(); + ArrayList<OutgoingStream> streams = new ArrayList<>(); for (SSTableReader sstable : sstables) { - details.add(new StreamSession.SSTableStreamingSections(sstables.get(sstable), - sstable.getPositionsForRanges(ranges), - sstable.estimatedKeysForRanges(ranges))); + streams.add(new CassandraOutgoingFile(operation, + sstables.get(sstable), + sstable.getPositionsForRanges(ranges), + sstable.estimatedKeysForRanges(ranges))); } - return details; + return streams; + } + + private Collection<OutgoingStream> makeOutgoingStreams(List<Range<Token>> ranges, Refs<SSTableReader> sstables) + { + return makeOutgoingStreams(StreamOperation.OTHER, ranges, sstables); } private void doTransferTable(boolean transferSSTables) throws Exception @@ -458,7 +465,7 @@ public class StreamingTransferTest // Acquiring references, transferSSTables needs it Refs<SSTableReader> refs = Refs.tryRef(Arrays.asList(sstable, sstable2)); assert refs != null; - new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, refs)).execute().get(); + new StreamPlan("StreamingTransferTest").transferStreams(LOCAL, makeOutgoingStreams(ranges, refs)).execute().get(); // confirm that the sstables were transferred and registered and that 2 keys arrived ColumnFamilyStore cfstore = Keyspace.open(keyspaceName).getColumnFamilyStore(cfname); @@ -513,7 +520,7 @@ public class StreamingTransferTest if (refs == null) throw new AssertionError(); - new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, refs)).execute().get(); + new StreamPlan("StreamingTransferTest").transferStreams(LOCAL, makeOutgoingStreams(ranges, refs)).execute().get(); // check that only two keys were transferred for (Map.Entry<DecoratedKey,String> entry : Arrays.asList(first, last)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java index 617bae1..fd22a65 100644 --- a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java +++ b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.streaming.async; -import java.net.InetSocketAddress; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -64,8 +63,8 @@ public class NettyStreamingMessageSenderTest channel = new EmbeddedChannel(); channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE); UUID pendingRepair = UUID.randomUUID(); - session = new StreamSession(REMOTE_ADDR, REMOTE_ADDR, (connectionId, protocolVersion) -> null, 0, true, pendingRepair, PreviewKind.ALL); - StreamResultFuture future = StreamResultFuture.initReceivingSide(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, true, pendingRepair, session.getPreviewKind()); + session = new StreamSession(StreamOperation.BOOTSTRAP, REMOTE_ADDR, REMOTE_ADDR, (connectionId, protocolVersion) -> null, 0, pendingRepair, PreviewKind.ALL); + StreamResultFuture future = StreamResultFuture.initReceivingSide(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, pendingRepair, session.getPreviewKind()); session.init(future); sender = session.getMessageSender(); sender.setControlMessageChannel(channel); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org